Also use Objects as part of an operation but as a result don't generate Any operation...
[ACE_TAO.git] / ACE / tests / Monotonic_Message_Queue_Test.cpp
blobbbe1f5f594f426c9384c9837c3166f46d7f2a050
2 //=============================================================================
3 /**
4 * @file Monotonic_Message_Queue_Test.cpp
6 * This is a test that verifies the time policy features of the
7 * ACE_Message_Queue template.
8 * A template instantiation based on the ACE_Monotonic_Time_Policy
9 * is used to demonstrate the ability for making the message queue
10 * timeouts independent from system time changes (time shift).
12 * @author Martin Corino <mcorino@remedy.nl>
14 //=============================================================================
17 #include "test_config.h"
18 #include "ace/Reactor.h"
19 #include "ace/Timer_Queue.h"
20 #include "ace/Thread_Manager.h"
21 #include "ace/Message_Queue.h"
22 #include "ace/Monotonic_Time_Policy.h"
23 #include "ace/Synch_Traits.h"
24 #include "ace/Timer_Heap_T.h"
25 #include "ace/Event_Handler_Handle_Timeout_Upcall.h"
26 #include "ace/TP_Reactor.h"
27 #include "ace/Task_T.h"
28 #include "ace/Truncate.h"
29 #include "ace/OS_NS_stdio.h"
30 #include "ace/OS_NS_string.h"
31 #include "ace/OS_NS_sys_time.h"
32 #include "ace/OS_NS_time.h"
33 #include "ace/OS_NS_unistd.h"
35 #if defined (ACE_HAS_MONOTONIC_TIME_POLICY) && defined (ACE_HAS_MONOTONIC_CONDITIONS)
37 # if defined (ACE_WIN32)
38 # include "ace/Date_Time.h"
39 # endif
41 # if defined (ACE_HAS_THREADS)
42 typedef ACE_Message_Queue<ACE_MT_SYNCH, ACE_Monotonic_Time_Policy> SYNCH_QUEUE;
44 // Create timer queue with hr support
45 ACE_Timer_Queue *
46 create_timer_queue (void)
48 ACE_Timer_Queue * tmq = 0;
50 typedef ACE_Timer_Heap_T<ACE_Event_Handler *,
51 ACE_Event_Handler_Handle_Timeout_Upcall,
52 ACE_SYNCH_RECURSIVE_MUTEX,
53 ACE_HR_Time_Policy> timer_queue_type;
54 ACE_NEW_RETURN (tmq, timer_queue_type (), 0);
56 return tmq;
59 class MyTask : public ACE_Task<ACE_MT_SYNCH>
61 public:
62 MyTask () : my_reactor_ (0), my_tq_ (0) {}
64 virtual ~MyTask () { stop (); }
66 virtual int svc (void);
68 int start (int num_threads);
69 int stop (void);
70 ACE_Reactor* get_reactor ();
71 int create_reactor (void);
73 private:
74 int delete_reactor (void);
76 ACE_SYNCH_RECURSIVE_MUTEX lock_;
77 ACE_Reactor *my_reactor_;
78 ACE_Timer_Queue *my_tq_;
81 ACE_Reactor*
82 MyTask::get_reactor ()
84 return my_reactor_;
87 int
88 MyTask::create_reactor (void)
90 ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX,
91 monitor,
92 this->lock_,
93 -1);
95 ACE_TEST_ASSERT (this->my_reactor_ == 0);
97 this->my_tq_ = create_timer_queue ();
99 ACE_TP_Reactor * pImpl = 0;
101 ACE_NEW_RETURN (pImpl,ACE_TP_Reactor (0, this->my_tq_), -1);
103 ACE_NEW_RETURN (my_reactor_,
104 ACE_Reactor (pImpl ,1),
105 -1);
107 ACE_DEBUG ((LM_DEBUG,
108 ACE_TEXT (" (%t) Create TP_Reactor\n")));
110 this->reactor (my_reactor_);
112 return 0;
116 MyTask::delete_reactor (void)
118 ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX,
119 monitor,
120 this->lock_,
121 -1);
123 ACE_DEBUG ((LM_DEBUG,
124 ACE_TEXT (" (%t) Delete TP_Reactor\n")));
126 this->reactor (0);
127 delete this->my_reactor_;
128 this->my_reactor_ = 0;
129 delete this->my_tq_;
130 this->my_tq_ = 0;
131 return 0;
135 MyTask::start (int num_threads)
137 if (this->activate (THR_NEW_LWP, num_threads) == -1)
138 ACE_ERROR_RETURN ((LM_ERROR,
139 ACE_TEXT ("%p.\n"),
140 ACE_TEXT ("unable to activate thread pool")),
141 -1);
143 return 0;
148 MyTask::stop (void)
150 if (this->my_reactor_ != 0)
152 ACE_DEBUG ((LM_DEBUG,
153 ACE_TEXT ("End TP_Reactor event loop\n")));
155 this->my_reactor_->end_reactor_event_loop ();
158 if (this->wait () == -1)
159 ACE_ERROR ((LM_ERROR,
160 ACE_TEXT ("%p.\n"),
161 ACE_TEXT ("unable to stop thread pool")));
163 if (this->delete_reactor () == -1)
164 ACE_ERROR ((LM_ERROR,
165 ACE_TEXT ("%p.\n"),
166 ACE_TEXT ("unable to delete reactor")));
168 return 0;
172 MyTask::svc (void)
174 ACE_DEBUG ((LM_DEBUG, ACE_TEXT (" (%P|%t) MyTask started\n")));
176 while (this->my_reactor_->reactor_event_loop_done () == 0)
177 this->my_reactor_->run_reactor_event_loop ();
179 ACE_DEBUG ((LM_DEBUG, ACE_TEXT (" (%t) MyTask finished\n")));
180 return 0;
183 class TestHandler
184 : public ACE_Event_Handler
186 public:
187 TestHandler (ACE_Reactor* reactor, SYNCH_QUEUE &mq)
188 : reactor_ (reactor),
189 mq_ (mq)
192 virtual int handle_timeout (const ACE_Time_Value &tv,
193 const void *arg);
195 bool trigger_in(const ACE_Time_Value &delay);
197 private:
198 ACE_Reactor* reactor_;
199 SYNCH_QUEUE& mq_;
202 int TestHandler::handle_timeout (const ACE_Time_Value &,
203 const void *)
205 const char S1[] = "message";
207 ACE_DEBUG ((LM_DEBUG, "(%P|%t) TestHandler::handle_timeout - timeout triggered\n"));
209 ACE_Message_Block* mb1 = new ACE_Message_Block(S1, sizeof S1);
211 this->mq_.enqueue_tail (mb1);
213 return 0;
216 bool TestHandler::trigger_in(const ACE_Time_Value &delay)
218 ACE_DEBUG ((LM_DEBUG, "(%P|%t) TestHandler::trigger_in - scheduling timer\n"));
219 return -1 != reactor_->schedule_timer (this, 0, delay, ACE_Time_Value (0));
222 void set_system_time(const ACE_Time_Value& tv)
224 # if defined (ACE_WIN32)
225 ACE_Date_Time curdt (tv);
226 SYSTEMTIME sys_time;
227 sys_time.wDay = ACE_Utils::truncate_cast <WORD> (curdt.day ());
228 sys_time.wMonth = ACE_Utils::truncate_cast <WORD> (curdt.month ());
229 sys_time.wYear = ACE_Utils::truncate_cast <WORD> (curdt.year ());
230 sys_time.wHour = ACE_Utils::truncate_cast <WORD> (curdt.hour ());
231 sys_time.wMinute = ACE_Utils::truncate_cast <WORD> (curdt.minute ());
232 sys_time.wSecond = ACE_Utils::truncate_cast <WORD> (curdt.second ());
233 sys_time.wMilliseconds = ACE_Utils::truncate_cast <WORD> (curdt.microsec () / 1000);
234 if (!::SetLocalTime (&sys_time))
235 # else
236 timespec_t curts;
237 curts = tv;
238 if (ACE_OS::clock_settime (CLOCK_REALTIME, &curts) != 0)
239 # endif
241 ACE_DEBUG((LM_INFO,
242 "(%P|%t) Unable to reset OS time. Insufficient privileges or not supported.\n"));
246 // Ensure that the timedout dequeue_head() keeps working in case of timeshift when using monotonic timer.
248 static bool
249 timeout_test (void)
251 bool status = true;
252 SYNCH_QUEUE mq;
253 MyTask task1;
254 task1.create_reactor ();
255 task1.start (1);
256 TestHandler test_handler (task1.get_reactor (), mq);
258 // The reactor of taks1 that uses a hrtimer will trigger a timeout in
259 // 5 seconds which will enqueue a message block in the queue. At the
260 // same moment we calculate a timeout for the dequeue operation for
261 // 3 seconds in the future. Than we set the system time 4 seconds back.
262 // The condition should timeout because the queue is empty and the trigger
263 // only fires after the condition has timed out.
264 // Next we start another dequeue for 3 seconds in the future which should
265 // return before timing out because by then the trigger should have fired.
266 // In case of using regular system time policy for message queue and
267 // dequeue timeouts the first dequeue would not have timed out because
268 // between calculating the timeout and starting the dequeue the system time
269 // shifted back 4 sec causing the trigger to fire before the timeout elapsed.
270 // In case timeshifting does not work because of priority problems or such
271 // the test should succeed.
273 if (!test_handler.trigger_in (ACE_Time_Value (5, 0)))
274 ACE_ERROR_RETURN ((LM_ERROR,
275 "(%P|%t) Unable to schedule trigger.\n"),
276 false);
278 if (!mq.is_empty ())
280 ACE_ERROR ((LM_ERROR,
281 ACE_TEXT ("New queue is not empty!\n")));
282 status = false;
284 else
286 ACE_Message_Block *b = 0;
287 ACE_Time_Value_T<ACE_Monotonic_Time_Policy> tv;
288 tv = (tv.now () + ACE_Time_Value (3,0)); // Now (monotonic time) + 3 sec
290 // shift back in time 4 sec
291 set_system_time (ACE_OS::gettimeofday () - ACE_Time_Value (4, 0));
293 if (mq.dequeue_head (b, &tv) != -1)
295 ACE_ERROR ((LM_ERROR,
296 ACE_TEXT ("Dequeued before timeout elapsed!\n")));
297 status = false;
299 else if (errno != EWOULDBLOCK)
301 ACE_ERROR ((LM_ERROR,
302 ACE_TEXT ("%p\n"),
303 ACE_TEXT ("Dequeue timeout should be EWOULDBLOCK, got")));
304 status = false;
306 else
308 ACE_DEBUG ((LM_DEBUG,
309 ACE_TEXT ("First dequeue timed out: OK\n")));
311 tv = (tv.now () + ACE_Time_Value (3,0)); // Now (monotonic time) + 3 sec
312 if (mq.dequeue_head (b, &tv) != -1)
314 ACE_DEBUG ((LM_DEBUG,
315 ACE_TEXT ("Second dequeue succeeded: OK\n")));
316 delete b;
318 else
320 ACE_ERROR ((LM_ERROR,
321 ACE_TEXT ("Second dequeue timed out!\n")));
322 status = false;
326 // restore time
327 set_system_time (ACE_OS::gettimeofday () + ACE_Time_Value (4, 0));
330 ACE_DEBUG((LM_INFO,
331 "(%P|%t) Asking worker thread to finish.\n"));
332 task1.stop ();
334 ACE_Thread_Manager::instance ()->wait ();
336 return status;
338 # endif /* ACE_HAS_THREADS */
341 run_main (int , ACE_TCHAR *[])
343 ACE_START_TEST (ACE_TEXT ("Monotonic_Message_Queue_Test"));
345 int status = 0;
347 # if defined (ACE_HAS_THREADS)
348 if (!timeout_test ())
350 ACE_ERROR ((LM_ERROR,
351 ACE_TEXT ("%p\n"),
352 ACE_TEXT ("test failed")));
353 status = 1;
355 # endif /* ACE_HAS_THREADS */
357 ACE_END_TEST;
358 return status;
361 #else
364 run_main (int , ACE_TCHAR *[])
366 ACE_START_TEST (ACE_TEXT ("Monotonic_Message_Queue_Test"));
367 ACE_DEBUG((LM_INFO,
368 "(%P|%t) ACE not compiled with monotonic time.\n"));
369 ACE_END_TEST;
370 return 0;
373 #endif