2 //=============================================================================
4 * @file Monotonic_Message_Queue_Test.cpp
6 * This is a test that verifies the time policy features of the
7 * ACE_Message_Queue template.
8 * A template instantiation based on the ACE_Monotonic_Time_Policy
9 * is used to demonstrate the ability for making the message queue
10 * timeouts independent from system time changes (time shift).
12 * @author Martin Corino <mcorino@remedy.nl>
14 //=============================================================================
17 #include "test_config.h"
18 #include "ace/Reactor.h"
19 #include "ace/Timer_Queue.h"
20 #include "ace/Thread_Manager.h"
21 #include "ace/Message_Queue.h"
22 #include "ace/Monotonic_Time_Policy.h"
23 #include "ace/Synch_Traits.h"
24 #include "ace/Timer_Heap_T.h"
25 #include "ace/Event_Handler_Handle_Timeout_Upcall.h"
26 #include "ace/TP_Reactor.h"
27 #include "ace/Task_T.h"
28 #include "ace/Truncate.h"
29 #include "ace/OS_NS_stdio.h"
30 #include "ace/OS_NS_string.h"
31 #include "ace/OS_NS_sys_time.h"
32 #include "ace/OS_NS_time.h"
33 #include "ace/OS_NS_unistd.h"
35 #if defined (ACE_HAS_MONOTONIC_TIME_POLICY) && defined (ACE_HAS_MONOTONIC_CONDITIONS)
37 # if defined (ACE_WIN32)
38 # include "ace/Date_Time.h"
41 # if defined (ACE_HAS_THREADS)
42 using SYNCH_QUEUE
= ACE_Message_Queue
<ACE_MT_SYNCH
, ACE_Monotonic_Time_Policy
>;
44 // Create timer queue with hr support
48 ACE_Timer_Queue
* tmq
= 0;
50 using timer_queue_type
= ACE_Timer_Heap_T
<ACE_Event_Handler
*, ACE_Event_Handler_Handle_Timeout_Upcall
, ACE_MT_SYNCH::RECURSIVE_MUTEX
, ACE_HR_Time_Policy
>;
51 ACE_NEW_RETURN (tmq
, timer_queue_type (), 0);
56 class MyTask
: public ACE_Task
<ACE_MT_SYNCH
>
59 MyTask () : my_reactor_ (0), my_tq_ (0) {}
61 ~MyTask () override
{ stop (); }
65 int start (int num_threads
);
67 ACE_Reactor
* get_reactor ();
68 int create_reactor ();
71 int delete_reactor ();
73 ACE_SYNCH_RECURSIVE_MUTEX lock_
;
74 ACE_Reactor
*my_reactor_
;
75 ACE_Timer_Queue
*my_tq_
;
79 MyTask::get_reactor ()
85 MyTask::create_reactor ()
87 ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX
,
92 ACE_TEST_ASSERT (this->my_reactor_
== 0);
94 this->my_tq_
= create_timer_queue ();
96 ACE_TP_Reactor
* pImpl
= 0;
98 ACE_NEW_RETURN (pImpl
,ACE_TP_Reactor (0, this->my_tq_
), -1);
100 ACE_NEW_RETURN (my_reactor_
,
101 ACE_Reactor (pImpl
,1),
104 ACE_DEBUG ((LM_DEBUG
,
105 ACE_TEXT (" (%t) Create TP_Reactor\n")));
107 this->reactor (my_reactor_
);
113 MyTask::delete_reactor ()
115 ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX
,
120 ACE_DEBUG ((LM_DEBUG
,
121 ACE_TEXT (" (%t) Delete TP_Reactor\n")));
124 delete this->my_reactor_
;
125 this->my_reactor_
= 0;
132 MyTask::start (int num_threads
)
134 if (this->activate (THR_NEW_LWP
, num_threads
) == -1)
135 ACE_ERROR_RETURN ((LM_ERROR
,
137 ACE_TEXT ("unable to activate thread pool")),
147 if (this->my_reactor_
!= 0)
149 ACE_DEBUG ((LM_DEBUG
,
150 ACE_TEXT ("End TP_Reactor event loop\n")));
152 this->my_reactor_
->end_reactor_event_loop ();
155 if (this->wait () == -1)
156 ACE_ERROR ((LM_ERROR
,
158 ACE_TEXT ("unable to stop thread pool")));
160 if (this->delete_reactor () == -1)
161 ACE_ERROR ((LM_ERROR
,
163 ACE_TEXT ("unable to delete reactor")));
171 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT (" (%P|%t) MyTask started\n")));
173 while (this->my_reactor_
->reactor_event_loop_done () == 0)
174 this->my_reactor_
->run_reactor_event_loop ();
176 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT (" (%t) MyTask finished\n")));
181 : public ACE_Event_Handler
184 TestHandler (ACE_Reactor
* reactor
, SYNCH_QUEUE
&mq
)
185 : reactor_ (reactor
),
189 int handle_timeout (const ACE_Time_Value
&tv
,
190 const void *arg
) override
;
192 bool trigger_in(const ACE_Time_Value
&delay
);
195 ACE_Reactor
* reactor_
;
199 int TestHandler::handle_timeout (const ACE_Time_Value
&,
202 const char S1
[] = "message";
204 ACE_DEBUG ((LM_DEBUG
, "(%P|%t) TestHandler::handle_timeout - timeout triggered\n"));
206 ACE_Message_Block
* mb1
= new ACE_Message_Block(S1
, sizeof S1
);
208 this->mq_
.enqueue_tail (mb1
);
213 bool TestHandler::trigger_in(const ACE_Time_Value
&delay
)
215 ACE_DEBUG ((LM_DEBUG
, "(%P|%t) TestHandler::trigger_in - scheduling timer\n"));
216 return -1 != reactor_
->schedule_timer (this, 0, delay
, ACE_Time_Value (0));
219 void set_system_time(const ACE_Time_Value
& tv
)
221 # if defined (ACE_WIN32)
222 ACE_Date_Time
curdt (tv
);
224 sys_time
.wDay
= ACE_Utils::truncate_cast
<WORD
> (curdt
.day ());
225 sys_time
.wMonth
= ACE_Utils::truncate_cast
<WORD
> (curdt
.month ());
226 sys_time
.wYear
= ACE_Utils::truncate_cast
<WORD
> (curdt
.year ());
227 sys_time
.wHour
= ACE_Utils::truncate_cast
<WORD
> (curdt
.hour ());
228 sys_time
.wMinute
= ACE_Utils::truncate_cast
<WORD
> (curdt
.minute ());
229 sys_time
.wSecond
= ACE_Utils::truncate_cast
<WORD
> (curdt
.second ());
230 sys_time
.wMilliseconds
= ACE_Utils::truncate_cast
<WORD
> (curdt
.microsec () / 1000);
231 if (!::SetLocalTime (&sys_time
))
235 if (ACE_OS::clock_settime (CLOCK_REALTIME
, &curts
) != 0)
239 "(%P|%t) Unable to reset OS time. Insufficient privileges or not supported.\n"));
243 // Ensure that the timedout dequeue_head() keeps working in case of timeshift when using monotonic timer.
251 task1
.create_reactor ();
253 TestHandler
test_handler (task1
.get_reactor (), mq
);
255 // The reactor of taks1 that uses a hrtimer will trigger a timeout in
256 // 5 seconds which will enqueue a message block in the queue. At the
257 // same moment we calculate a timeout for the dequeue operation for
258 // 3 seconds in the future. Than we set the system time 4 seconds back.
259 // The condition should timeout because the queue is empty and the trigger
260 // only fires after the condition has timed out.
261 // Next we start another dequeue for 3 seconds in the future which should
262 // return before timing out because by then the trigger should have fired.
263 // In case of using regular system time policy for message queue and
264 // dequeue timeouts the first dequeue would not have timed out because
265 // between calculating the timeout and starting the dequeue the system time
266 // shifted back 4 sec causing the trigger to fire before the timeout elapsed.
267 // In case timeshifting does not work because of priority problems or such
268 // the test should succeed.
270 if (!test_handler
.trigger_in (ACE_Time_Value (5, 0)))
271 ACE_ERROR_RETURN ((LM_ERROR
,
272 "(%P|%t) Unable to schedule trigger.\n"),
277 ACE_ERROR ((LM_ERROR
,
278 ACE_TEXT ("New queue is not empty!\n")));
283 ACE_Message_Block
*b
= 0;
284 ACE_Time_Value_T
<ACE_Monotonic_Time_Policy
> tv
;
285 tv
= (tv
.now () + ACE_Time_Value (3,0)); // Now (monotonic time) + 3 sec
287 // shift back in time 4 sec
288 set_system_time (ACE_OS::gettimeofday () - ACE_Time_Value (4, 0));
290 if (mq
.dequeue_head (b
, &tv
) != -1)
292 ACE_ERROR ((LM_ERROR
,
293 ACE_TEXT ("Dequeued before timeout elapsed!\n")));
296 else if (errno
!= EWOULDBLOCK
)
298 ACE_ERROR ((LM_ERROR
,
300 ACE_TEXT ("Dequeue timeout should be EWOULDBLOCK, got")));
305 ACE_DEBUG ((LM_DEBUG
,
306 ACE_TEXT ("First dequeue timed out: OK\n")));
308 tv
= (tv
.now () + ACE_Time_Value (3,0)); // Now (monotonic time) + 3 sec
309 if (mq
.dequeue_head (b
, &tv
) != -1)
311 ACE_DEBUG ((LM_DEBUG
,
312 ACE_TEXT ("Second dequeue succeeded: OK\n")));
317 ACE_ERROR ((LM_ERROR
,
318 ACE_TEXT ("Second dequeue timed out!\n")));
324 set_system_time (ACE_OS::gettimeofday () + ACE_Time_Value (4, 0));
328 "(%P|%t) Asking worker thread to finish.\n"));
331 ACE_Thread_Manager::instance ()->wait ();
335 # endif /* ACE_HAS_THREADS */
338 run_main (int , ACE_TCHAR
*[])
340 ACE_START_TEST (ACE_TEXT ("Monotonic_Message_Queue_Test"));
344 # if defined (ACE_HAS_THREADS)
345 if (!timeout_test ())
347 ACE_ERROR ((LM_ERROR
,
349 ACE_TEXT ("test failed")));
352 # endif /* ACE_HAS_THREADS */
361 run_main (int , ACE_TCHAR
*[])
363 ACE_START_TEST (ACE_TEXT ("Monotonic_Message_Queue_Test"));
365 "(%P|%t) ACE not compiled with monotonic time.\n"));