Merge pull request #2309 from mitza-oci/warnings
[ACE_TAO.git] / ACE / examples / Threads / task_three.cpp
blob89d118daba0b725a1ce0b587dbd66f4967404526
1 // Exercise more tests for the <ACE_Task>s. This also shows off some
2 // Interesting uses of the <ACE_Log_Msg>'s ability to print to
3 // ostreams. BTW, make sure that you set the out_stream in *every*
4 // thread that you want to have write to the output file, i.e.:
5 //
6 // if (out_stream)
7 // {
8 // ACE_LOG_MSG->set_flags (ACE_Log_Msg::OSTREAM);
9 // ACE_LOG_MSG->msg_ostream (out_stream);
10 // }
12 #include "ace/OS_NS_unistd.h"
13 #include "ace/OS_NS_stdio.h"
14 #include "ace/OS_main.h"
15 #include "ace/Reactor.h"
16 #include "ace/Service_Config.h"
17 #include "ace/Task.h"
19 // FUZZ: disable check_for_streams_include
20 #include "ace/streams.h"
22 #include "ace/Signal.h"
25 #if defined (ACE_HAS_THREADS)
27 static ACE_OSTREAM_TYPE *out_stream = 0;
28 static sig_atomic_t done = 0;
29 static const size_t NUM_INVOCATIONS = 100;
30 static const size_t TASK_COUNT = 130;
32 class Test_Task : public ACE_Task<ACE_MT_SYNCH>
34 public:
35 Test_Task ();
36 ~Test_Task ();
38 //FUZZ: disable check_for_lack_ACE_OS
39 virtual int open (void *args = 0);
40 virtual int close (u_long flags = 0);
41 //FUZZ: enable check_for_lack_ACE_OS
43 virtual int svc ();
45 virtual int handle_input (ACE_HANDLE fd);
47 ACE_Reactor *r_;
48 size_t handled_;
49 static size_t current_count_;
50 static size_t done_cnt_;
53 size_t Test_Task::current_count_ = 0;
54 size_t Test_Task::done_cnt_ = 0;
56 static ACE_Thread_Mutex Lock;
58 Test_Task::Test_Task ()
60 ACE_GUARD (ACE_Thread_Mutex, ace_mon, Lock);
62 this->handled_ = 0;
63 Test_Task::current_count_++;
64 ACE_DEBUG ((LM_DEBUG,
65 ACE_TEXT ("Test_Task constructed, current_count_ = %d\n"),
66 Test_Task::current_count_));
69 Test_Task::~Test_Task ()
71 ACE_GUARD (ACE_Thread_Mutex, ace_mon, Lock);
73 ACE_DEBUG ((LM_DEBUG,
74 ACE_TEXT ("Test_Task destroyed, current_count_ = %d\n"),
75 Test_Task::current_count_));
78 int
79 Test_Task::open (void *args)
81 r_ = reinterpret_cast <ACE_Reactor *> (args);
82 return ACE_Task<ACE_MT_SYNCH>::activate (THR_NEW_LWP);
85 int
86 Test_Task::close (u_long)
88 ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, Lock, -1);
90 Test_Task::current_count_--;
91 ACE_DEBUG ((LM_DEBUG,
92 ACE_TEXT ("Test_Task::close () current_count_ = %d.\n"),
93 Test_Task::current_count_));
94 return 0;
97 int
98 Test_Task::svc ()
100 // Every thread must register the same stream to write to file.
101 if (out_stream)
103 ACE_LOG_MSG->set_flags (ACE_Log_Msg::OSTREAM);
104 ACE_LOG_MSG->msg_ostream (out_stream);
107 for (size_t index = 0; index < NUM_INVOCATIONS; index++)
109 ACE_OS::thr_yield ();
111 if (r_->notify (this, ACE_Event_Handler::READ_MASK) == -1)
113 ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, Lock, -1);
115 ACE_ERROR_RETURN ((LM_ERROR,
116 ACE_TEXT ("Test_Task: error %p!\n"),
117 ACE_TEXT ("notifying reactor")),
122 ACE_DEBUG ((LM_DEBUG, ACE_TEXT (" (%t) returning from svc ()\n")));
123 return 0;
127 Test_Task::handle_input (ACE_HANDLE)
129 this->handled_++;
131 if (this->handled_ == NUM_INVOCATIONS)
133 ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, Lock, -1);
134 Test_Task::done_cnt_++;
135 ACE_DEBUG ((LM_DEBUG,
136 ACE_TEXT (" (%t) Test_Task: handle_input done_cnt_ = %d.\n"),
137 Test_Task::done_cnt_));
140 ACE_OS::thr_yield ();
141 return -1;
144 static void *
145 dispatch (void *arg)
147 // every thread must register the same stream to write to file
148 if (out_stream)
150 ACE_LOG_MSG->set_flags (ACE_Log_Msg::OSTREAM);
151 ACE_LOG_MSG->msg_ostream (out_stream);
154 ACE_DEBUG ((LM_DEBUG, ACE_TEXT (" (%t) Dispatcher Thread started!\n")));
155 ACE_Reactor *r = reinterpret_cast <ACE_Reactor *> (arg);
156 int result;
158 r->owner (ACE_OS::thr_self ());
160 while (1)
162 result = r->handle_events ();
164 if (result <= 0)
165 ACE_DEBUG ((LM_DEBUG,
166 ACE_TEXT ("Dispatch: handle_events (): %d"),
167 result));
170 ACE_NOTREACHED (return 0);
173 extern "C" void
174 handler (int)
176 done = 1;
180 ACE_TMAIN (int argc, ACE_TCHAR *[])
182 if (argc > 1)
184 // Send output to file.
185 #if !defined (ACE_LACKS_IOSTREAM_TOTALLY)
186 ACE_NEW_RETURN (out_stream,
187 ofstream ("test_task_three.out",
188 ios::trunc|ios::out),
189 -1);
190 #else
191 if ((out_stream = ACE_OS::fopen ("test_task_three.out", "w")) == 0)
192 return -1;
193 #endif
194 ACE_LOG_MSG->set_flags (ACE_Log_Msg::OSTREAM);
195 ACE_LOG_MSG->msg_ostream (out_stream);
198 // Register a signal handler.
199 ACE_Sig_Action sa ((ACE_SignalHandler) handler, SIGINT);
200 ACE_UNUSED_ARG (sa);
202 ACE_Reactor *reactor1 = ACE_Reactor::instance ();
203 ACE_Reactor *reactor2 = new ACE_Reactor ();
205 Test_Task t1[TASK_COUNT];
206 Test_Task t2[TASK_COUNT];
208 ACE_Thread::spawn (ACE_THR_FUNC (dispatch),
209 reactor2);
211 reactor1->owner (ACE_OS::thr_self ());
213 for (size_t index = 0; index < TASK_COUNT; index++)
215 t1[index].open (reactor1);
216 t2[index].open (reactor2);
219 ACE_OS::sleep (3);
221 while (done == 0)
223 ACE_Time_Value timeout (2);
225 if (reactor1->handle_events (timeout) <= 0)
227 if (errno == ETIME)
229 ACE_DEBUG ((LM_DEBUG,
230 ACE_TEXT ("no activity within 2 seconds, shutting down\n")));
231 break;
233 else
234 ACE_ERROR ((LM_ERROR,
235 ACE_TEXT ("%p error handling events\n"),
236 ACE_TEXT ("main")));
240 if (argc > 1)
242 #if !defined (ACE_LACKS_IOSTREAM_TOTALLY)
243 *out_stream << flush;
244 delete out_stream;
245 #else
246 ACE_OS::fflush(out_stream);
247 ACE_OS::fclose(out_stream);
248 #endif
249 ACE_LOG_MSG->clr_flags (ACE_Log_Msg::OSTREAM);
250 ACE_LOG_MSG->msg_ostream (0);
253 // Bail out here so that we don't call the destructors for the tasks..
254 ACE_OS::exit (0);
255 /* NOTREACHED */
257 return 0;
260 #else
262 ACE_TMAIN (int, ACE_TCHAR *[])
264 ACE_ERROR ((LM_ERROR,
265 ACE_TEXT ("threads not supported on this platform\n")));
266 return 0;
268 #endif /* ACE_HAS_THREADS */