2 //=============================================================================
4 * @file Monotonic_Message_Queue_Test.cpp
6 * This is a test that verifies the time policy features of the
7 * ACE_Message_Queue template.
8 * A template instantiation based on the ACE_Monotonic_Time_Policy
9 * is used to demonstrate the ability for making the message queue
10 * timeouts independent from system time changes (time shift).
12 * @author Martin Corino <mcorino@remedy.nl>
14 //=============================================================================
17 #include "test_config.h"
18 #include "ace/Reactor.h"
19 #include "ace/Timer_Queue.h"
20 #include "ace/Thread_Manager.h"
21 #include "ace/Message_Queue.h"
22 #include "ace/Monotonic_Time_Policy.h"
23 #include "ace/Synch_Traits.h"
24 #include "ace/Timer_Heap_T.h"
25 #include "ace/Event_Handler_Handle_Timeout_Upcall.h"
26 #include "ace/TP_Reactor.h"
27 #include "ace/Task_T.h"
28 #include "ace/Truncate.h"
29 #include "ace/OS_NS_stdio.h"
30 #include "ace/OS_NS_string.h"
31 #include "ace/OS_NS_sys_time.h"
32 #include "ace/OS_NS_time.h"
33 #include "ace/OS_NS_unistd.h"
35 #if defined (ACE_HAS_MONOTONIC_TIME_POLICY) && defined (ACE_HAS_MONOTONIC_CONDITIONS)
37 # if defined (ACE_WIN32)
38 # include "ace/Date_Time.h"
41 # if defined (ACE_HAS_THREADS)
42 typedef ACE_Message_Queue
<ACE_MT_SYNCH
, ACE_Monotonic_Time_Policy
> SYNCH_QUEUE
;
44 // Create timer queue with hr support
46 create_timer_queue (void)
48 ACE_Timer_Queue
* tmq
= 0;
50 typedef ACE_Timer_Heap_T
<ACE_Event_Handler
*,
51 ACE_Event_Handler_Handle_Timeout_Upcall
,
52 ACE_SYNCH_RECURSIVE_MUTEX
,
53 ACE_HR_Time_Policy
> timer_queue_type
;
54 ACE_NEW_RETURN (tmq
, timer_queue_type (), 0);
59 class MyTask
: public ACE_Task
<ACE_MT_SYNCH
>
62 MyTask () : my_reactor_ (0), my_tq_ (0) {}
64 virtual ~MyTask () { stop (); }
66 virtual int svc (void);
68 int start (int num_threads
);
70 ACE_Reactor
* get_reactor ();
71 int create_reactor (void);
74 int delete_reactor (void);
76 ACE_SYNCH_RECURSIVE_MUTEX lock_
;
77 ACE_Reactor
*my_reactor_
;
78 ACE_Timer_Queue
*my_tq_
;
82 MyTask::get_reactor ()
88 MyTask::create_reactor (void)
90 ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX
,
95 ACE_TEST_ASSERT (this->my_reactor_
== 0);
97 this->my_tq_
= create_timer_queue ();
99 ACE_TP_Reactor
* pImpl
= 0;
101 ACE_NEW_RETURN (pImpl
,ACE_TP_Reactor (0, this->my_tq_
), -1);
103 ACE_NEW_RETURN (my_reactor_
,
104 ACE_Reactor (pImpl
,1),
107 ACE_DEBUG ((LM_DEBUG
,
108 ACE_TEXT (" (%t) Create TP_Reactor\n")));
110 this->reactor (my_reactor_
);
116 MyTask::delete_reactor (void)
118 ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX
,
123 ACE_DEBUG ((LM_DEBUG
,
124 ACE_TEXT (" (%t) Delete TP_Reactor\n")));
127 delete this->my_reactor_
;
128 this->my_reactor_
= 0;
135 MyTask::start (int num_threads
)
137 if (this->activate (THR_NEW_LWP
, num_threads
) == -1)
138 ACE_ERROR_RETURN ((LM_ERROR
,
140 ACE_TEXT ("unable to activate thread pool")),
150 if (this->my_reactor_
!= 0)
152 ACE_DEBUG ((LM_DEBUG
,
153 ACE_TEXT ("End TP_Reactor event loop\n")));
155 this->my_reactor_
->end_reactor_event_loop ();
158 if (this->wait () == -1)
159 ACE_ERROR ((LM_ERROR
,
161 ACE_TEXT ("unable to stop thread pool")));
163 if (this->delete_reactor () == -1)
164 ACE_ERROR ((LM_ERROR
,
166 ACE_TEXT ("unable to delete reactor")));
174 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT (" (%P|%t) MyTask started\n")));
176 while (this->my_reactor_
->reactor_event_loop_done () == 0)
177 this->my_reactor_
->run_reactor_event_loop ();
179 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT (" (%t) MyTask finished\n")));
184 : public ACE_Event_Handler
187 TestHandler (ACE_Reactor
* reactor
, SYNCH_QUEUE
&mq
)
188 : reactor_ (reactor
),
192 virtual int handle_timeout (const ACE_Time_Value
&tv
,
195 bool trigger_in(const ACE_Time_Value
&delay
);
198 ACE_Reactor
* reactor_
;
202 int TestHandler::handle_timeout (const ACE_Time_Value
&,
205 const char S1
[] = "message";
207 ACE_DEBUG ((LM_DEBUG
, "(%P|%t) TestHandler::handle_timeout - timeout triggered\n"));
209 ACE_Message_Block
* mb1
= new ACE_Message_Block(S1
, sizeof S1
);
211 this->mq_
.enqueue_tail (mb1
);
216 bool TestHandler::trigger_in(const ACE_Time_Value
&delay
)
218 ACE_DEBUG ((LM_DEBUG
, "(%P|%t) TestHandler::trigger_in - scheduling timer\n"));
219 return -1 != reactor_
->schedule_timer (this, 0, delay
, ACE_Time_Value (0));
222 void set_system_time(const ACE_Time_Value
& tv
)
224 # if defined (ACE_WIN32)
225 ACE_Date_Time
curdt (tv
);
227 sys_time
.wDay
= ACE_Utils::truncate_cast
<WORD
> (curdt
.day ());
228 sys_time
.wMonth
= ACE_Utils::truncate_cast
<WORD
> (curdt
.month ());
229 sys_time
.wYear
= ACE_Utils::truncate_cast
<WORD
> (curdt
.year ());
230 sys_time
.wHour
= ACE_Utils::truncate_cast
<WORD
> (curdt
.hour ());
231 sys_time
.wMinute
= ACE_Utils::truncate_cast
<WORD
> (curdt
.minute ());
232 sys_time
.wSecond
= ACE_Utils::truncate_cast
<WORD
> (curdt
.second ());
233 sys_time
.wMilliseconds
= ACE_Utils::truncate_cast
<WORD
> (curdt
.microsec () / 1000);
234 if (!::SetLocalTime (&sys_time
))
238 if (ACE_OS::clock_settime (CLOCK_REALTIME
, &curts
) != 0)
242 "(%P|%t) Unable to reset OS time. Insufficient privileges or not supported.\n"));
246 // Ensure that the timedout dequeue_head() keeps working in case of timeshift when using monotonic timer.
254 task1
.create_reactor ();
256 TestHandler
test_handler (task1
.get_reactor (), mq
);
258 // The reactor of taks1 that uses a hrtimer will trigger a timeout in
259 // 5 seconds which will enqueue a message block in the queue. At the
260 // same moment we calculate a timeout for the dequeue operation for
261 // 3 seconds in the future. Than we set the system time 4 seconds back.
262 // The condition should timeout because the queue is empty and the trigger
263 // only fires after the condition has timed out.
264 // Next we start another dequeue for 3 seconds in the future which should
265 // return before timing out because by then the trigger should have fired.
266 // In case of using regular system time policy for message queue and
267 // dequeue timeouts the first dequeue would not have timed out because
268 // between calculating the timeout and starting the dequeue the system time
269 // shifted back 4 sec causing the trigger to fire before the timeout elapsed.
270 // In case timeshifting does not work because of priority problems or such
271 // the test should succeed.
273 if (!test_handler
.trigger_in (ACE_Time_Value (5, 0)))
274 ACE_ERROR_RETURN ((LM_ERROR
,
275 "(%P|%t) Unable to schedule trigger.\n"),
280 ACE_ERROR ((LM_ERROR
,
281 ACE_TEXT ("New queue is not empty!\n")));
286 ACE_Message_Block
*b
= 0;
287 ACE_Time_Value_T
<ACE_Monotonic_Time_Policy
> tv
;
288 tv
= (tv
.now () + ACE_Time_Value (3,0)); // Now (monotonic time) + 3 sec
290 // shift back in time 4 sec
291 set_system_time (ACE_OS::gettimeofday () - ACE_Time_Value (4, 0));
293 if (mq
.dequeue_head (b
, &tv
) != -1)
295 ACE_ERROR ((LM_ERROR
,
296 ACE_TEXT ("Dequeued before timeout elapsed!\n")));
299 else if (errno
!= EWOULDBLOCK
)
301 ACE_ERROR ((LM_ERROR
,
303 ACE_TEXT ("Dequeue timeout should be EWOULDBLOCK, got")));
308 ACE_DEBUG ((LM_DEBUG
,
309 ACE_TEXT ("First dequeue timed out: OK\n")));
311 tv
= (tv
.now () + ACE_Time_Value (3,0)); // Now (monotonic time) + 3 sec
312 if (mq
.dequeue_head (b
, &tv
) != -1)
314 ACE_DEBUG ((LM_DEBUG
,
315 ACE_TEXT ("Second dequeue succeeded: OK\n")));
320 ACE_ERROR ((LM_ERROR
,
321 ACE_TEXT ("Second dequeue timed out!\n")));
327 set_system_time (ACE_OS::gettimeofday () + ACE_Time_Value (4, 0));
331 "(%P|%t) Asking worker thread to finish.\n"));
334 ACE_Thread_Manager::instance ()->wait ();
338 # endif /* ACE_HAS_THREADS */
341 run_main (int , ACE_TCHAR
*[])
343 ACE_START_TEST (ACE_TEXT ("Monotonic_Message_Queue_Test"));
347 # if defined (ACE_HAS_THREADS)
348 if (!timeout_test ())
350 ACE_ERROR ((LM_ERROR
,
352 ACE_TEXT ("test failed")));
355 # endif /* ACE_HAS_THREADS */
364 run_main (int , ACE_TCHAR
*[])
366 ACE_START_TEST (ACE_TEXT ("Monotonic_Message_Queue_Test"));
368 "(%P|%t) ACE not compiled with monotonic time.\n"));