Merge pull request #2309 from mitza-oci/warnings
[ACE_TAO.git] / ACE / tests / Monotonic_Message_Queue_Test.cpp
blob846761d312ecd8205e290d71808d37dea351ac31
2 //=============================================================================
3 /**
4 * @file Monotonic_Message_Queue_Test.cpp
6 * This is a test that verifies the time policy features of the
7 * ACE_Message_Queue template.
8 * A template instantiation based on the ACE_Monotonic_Time_Policy
9 * is used to demonstrate the ability for making the message queue
10 * timeouts independent from system time changes (time shift).
12 * @author Martin Corino <mcorino@remedy.nl>
14 //=============================================================================
17 #include "test_config.h"
18 #include "ace/Reactor.h"
19 #include "ace/Timer_Queue.h"
20 #include "ace/Thread_Manager.h"
21 #include "ace/Message_Queue.h"
22 #include "ace/Monotonic_Time_Policy.h"
23 #include "ace/Synch_Traits.h"
24 #include "ace/Timer_Heap_T.h"
25 #include "ace/Event_Handler_Handle_Timeout_Upcall.h"
26 #include "ace/TP_Reactor.h"
27 #include "ace/Task_T.h"
28 #include "ace/Truncate.h"
29 #include "ace/OS_NS_stdio.h"
30 #include "ace/OS_NS_string.h"
31 #include "ace/OS_NS_sys_time.h"
32 #include "ace/OS_NS_time.h"
33 #include "ace/OS_NS_unistd.h"
35 #if defined (ACE_HAS_MONOTONIC_TIME_POLICY) && defined (ACE_HAS_MONOTONIC_CONDITIONS)
37 # if defined (ACE_WIN32)
38 # include "ace/Date_Time.h"
39 # endif
41 # if defined (ACE_HAS_THREADS)
42 using SYNCH_QUEUE = ACE_Message_Queue<ACE_MT_SYNCH, ACE_Monotonic_Time_Policy>;
44 // Create timer queue with hr support
45 ACE_Timer_Queue *
46 create_timer_queue ()
48 ACE_Timer_Queue * tmq = 0;
50 using timer_queue_type = ACE_Timer_Heap_T<ACE_Event_Handler *, ACE_Event_Handler_Handle_Timeout_Upcall, ACE_MT_SYNCH::RECURSIVE_MUTEX, ACE_HR_Time_Policy>;
51 ACE_NEW_RETURN (tmq, timer_queue_type (), 0);
53 return tmq;
56 class MyTask : public ACE_Task<ACE_MT_SYNCH>
58 public:
59 MyTask () : my_reactor_ (0), my_tq_ (0) {}
61 ~MyTask () override { stop (); }
63 int svc () override;
65 int start (int num_threads);
66 int stop ();
67 ACE_Reactor* get_reactor ();
68 int create_reactor ();
70 private:
71 int delete_reactor ();
73 ACE_SYNCH_RECURSIVE_MUTEX lock_;
74 ACE_Reactor *my_reactor_;
75 ACE_Timer_Queue *my_tq_;
78 ACE_Reactor*
79 MyTask::get_reactor ()
81 return my_reactor_;
84 int
85 MyTask::create_reactor ()
87 ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX,
88 monitor,
89 this->lock_,
90 -1);
92 ACE_TEST_ASSERT (this->my_reactor_ == 0);
94 this->my_tq_ = create_timer_queue ();
96 ACE_TP_Reactor * pImpl = 0;
98 ACE_NEW_RETURN (pImpl,ACE_TP_Reactor (0, this->my_tq_), -1);
100 ACE_NEW_RETURN (my_reactor_,
101 ACE_Reactor (pImpl ,1),
102 -1);
104 ACE_DEBUG ((LM_DEBUG,
105 ACE_TEXT (" (%t) Create TP_Reactor\n")));
107 this->reactor (my_reactor_);
109 return 0;
113 MyTask::delete_reactor ()
115 ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX,
116 monitor,
117 this->lock_,
118 -1);
120 ACE_DEBUG ((LM_DEBUG,
121 ACE_TEXT (" (%t) Delete TP_Reactor\n")));
123 this->reactor (0);
124 delete this->my_reactor_;
125 this->my_reactor_ = 0;
126 delete this->my_tq_;
127 this->my_tq_ = 0;
128 return 0;
132 MyTask::start (int num_threads)
134 if (this->activate (THR_NEW_LWP, num_threads) == -1)
135 ACE_ERROR_RETURN ((LM_ERROR,
136 ACE_TEXT ("%p.\n"),
137 ACE_TEXT ("unable to activate thread pool")),
138 -1);
140 return 0;
145 MyTask::stop ()
147 if (this->my_reactor_ != 0)
149 ACE_DEBUG ((LM_DEBUG,
150 ACE_TEXT ("End TP_Reactor event loop\n")));
152 this->my_reactor_->end_reactor_event_loop ();
155 if (this->wait () == -1)
156 ACE_ERROR ((LM_ERROR,
157 ACE_TEXT ("%p.\n"),
158 ACE_TEXT ("unable to stop thread pool")));
160 if (this->delete_reactor () == -1)
161 ACE_ERROR ((LM_ERROR,
162 ACE_TEXT ("%p.\n"),
163 ACE_TEXT ("unable to delete reactor")));
165 return 0;
169 MyTask::svc ()
171 ACE_DEBUG ((LM_DEBUG, ACE_TEXT (" (%P|%t) MyTask started\n")));
173 while (this->my_reactor_->reactor_event_loop_done () == 0)
174 this->my_reactor_->run_reactor_event_loop ();
176 ACE_DEBUG ((LM_DEBUG, ACE_TEXT (" (%t) MyTask finished\n")));
177 return 0;
180 class TestHandler
181 : public ACE_Event_Handler
183 public:
184 TestHandler (ACE_Reactor* reactor, SYNCH_QUEUE &mq)
185 : reactor_ (reactor),
186 mq_ (mq)
189 int handle_timeout (const ACE_Time_Value &tv,
190 const void *arg) override;
192 bool trigger_in(const ACE_Time_Value &delay);
194 private:
195 ACE_Reactor* reactor_;
196 SYNCH_QUEUE& mq_;
199 int TestHandler::handle_timeout (const ACE_Time_Value &,
200 const void *)
202 const char S1[] = "message";
204 ACE_DEBUG ((LM_DEBUG, "(%P|%t) TestHandler::handle_timeout - timeout triggered\n"));
206 ACE_Message_Block* mb1 = new ACE_Message_Block(S1, sizeof S1);
208 this->mq_.enqueue_tail (mb1);
210 return 0;
213 bool TestHandler::trigger_in(const ACE_Time_Value &delay)
215 ACE_DEBUG ((LM_DEBUG, "(%P|%t) TestHandler::trigger_in - scheduling timer\n"));
216 return -1 != reactor_->schedule_timer (this, 0, delay, ACE_Time_Value (0));
219 void set_system_time(const ACE_Time_Value& tv)
221 # if defined (ACE_WIN32)
222 ACE_Date_Time curdt (tv);
223 SYSTEMTIME sys_time;
224 sys_time.wDay = ACE_Utils::truncate_cast <WORD> (curdt.day ());
225 sys_time.wMonth = ACE_Utils::truncate_cast <WORD> (curdt.month ());
226 sys_time.wYear = ACE_Utils::truncate_cast <WORD> (curdt.year ());
227 sys_time.wHour = ACE_Utils::truncate_cast <WORD> (curdt.hour ());
228 sys_time.wMinute = ACE_Utils::truncate_cast <WORD> (curdt.minute ());
229 sys_time.wSecond = ACE_Utils::truncate_cast <WORD> (curdt.second ());
230 sys_time.wMilliseconds = ACE_Utils::truncate_cast <WORD> (curdt.microsec () / 1000);
231 if (!::SetLocalTime (&sys_time))
232 # else
233 timespec_t curts;
234 curts = tv;
235 if (ACE_OS::clock_settime (CLOCK_REALTIME, &curts) != 0)
236 # endif
238 ACE_DEBUG((LM_INFO,
239 "(%P|%t) Unable to reset OS time. Insufficient privileges or not supported.\n"));
243 // Ensure that the timedout dequeue_head() keeps working in case of timeshift when using monotonic timer.
245 static bool
246 timeout_test ()
248 bool status = true;
249 SYNCH_QUEUE mq;
250 MyTask task1;
251 task1.create_reactor ();
252 task1.start (1);
253 TestHandler test_handler (task1.get_reactor (), mq);
255 // The reactor of taks1 that uses a hrtimer will trigger a timeout in
256 // 5 seconds which will enqueue a message block in the queue. At the
257 // same moment we calculate a timeout for the dequeue operation for
258 // 3 seconds in the future. Than we set the system time 4 seconds back.
259 // The condition should timeout because the queue is empty and the trigger
260 // only fires after the condition has timed out.
261 // Next we start another dequeue for 3 seconds in the future which should
262 // return before timing out because by then the trigger should have fired.
263 // In case of using regular system time policy for message queue and
264 // dequeue timeouts the first dequeue would not have timed out because
265 // between calculating the timeout and starting the dequeue the system time
266 // shifted back 4 sec causing the trigger to fire before the timeout elapsed.
267 // In case timeshifting does not work because of priority problems or such
268 // the test should succeed.
270 if (!test_handler.trigger_in (ACE_Time_Value (5, 0)))
271 ACE_ERROR_RETURN ((LM_ERROR,
272 "(%P|%t) Unable to schedule trigger.\n"),
273 false);
275 if (!mq.is_empty ())
277 ACE_ERROR ((LM_ERROR,
278 ACE_TEXT ("New queue is not empty!\n")));
279 status = false;
281 else
283 ACE_Message_Block *b = 0;
284 ACE_Time_Value_T<ACE_Monotonic_Time_Policy> tv;
285 tv = (tv.now () + ACE_Time_Value (3,0)); // Now (monotonic time) + 3 sec
287 // shift back in time 4 sec
288 set_system_time (ACE_OS::gettimeofday () - ACE_Time_Value (4, 0));
290 if (mq.dequeue_head (b, &tv) != -1)
292 ACE_ERROR ((LM_ERROR,
293 ACE_TEXT ("Dequeued before timeout elapsed!\n")));
294 status = false;
296 else if (errno != EWOULDBLOCK)
298 ACE_ERROR ((LM_ERROR,
299 ACE_TEXT ("%p\n"),
300 ACE_TEXT ("Dequeue timeout should be EWOULDBLOCK, got")));
301 status = false;
303 else
305 ACE_DEBUG ((LM_DEBUG,
306 ACE_TEXT ("First dequeue timed out: OK\n")));
308 tv = (tv.now () + ACE_Time_Value (3,0)); // Now (monotonic time) + 3 sec
309 if (mq.dequeue_head (b, &tv) != -1)
311 ACE_DEBUG ((LM_DEBUG,
312 ACE_TEXT ("Second dequeue succeeded: OK\n")));
313 delete b;
315 else
317 ACE_ERROR ((LM_ERROR,
318 ACE_TEXT ("Second dequeue timed out!\n")));
319 status = false;
323 // restore time
324 set_system_time (ACE_OS::gettimeofday () + ACE_Time_Value (4, 0));
327 ACE_DEBUG((LM_INFO,
328 "(%P|%t) Asking worker thread to finish.\n"));
329 task1.stop ();
331 ACE_Thread_Manager::instance ()->wait ();
333 return status;
335 # endif /* ACE_HAS_THREADS */
338 run_main (int , ACE_TCHAR *[])
340 ACE_START_TEST (ACE_TEXT ("Monotonic_Message_Queue_Test"));
342 int status = 0;
344 # if defined (ACE_HAS_THREADS)
345 if (!timeout_test ())
347 ACE_ERROR ((LM_ERROR,
348 ACE_TEXT ("%p\n"),
349 ACE_TEXT ("test failed")));
350 status = 1;
352 # endif /* ACE_HAS_THREADS */
354 ACE_END_TEST;
355 return status;
358 #else
361 run_main (int , ACE_TCHAR *[])
363 ACE_START_TEST (ACE_TEXT ("Monotonic_Message_Queue_Test"));
364 ACE_DEBUG((LM_INFO,
365 "(%P|%t) ACE not compiled with monotonic time.\n"));
366 ACE_END_TEST;
367 return 0;
370 #endif