1 #include "ace/Get_Opt.h"
3 #include "ace/Containers_T.h"
4 #include "ace/Reactor.h"
5 #include "ace/INet/HTTP_URL.h"
6 #include "ace/INet/HTTP_ClientRequestHandler.h"
7 #include "ace/INet/String_IOStream.h"
8 #include "ace/Truncate.h"
13 #if defined (ACE_HAS_THREADS)
14 class Get_Task
: public ACE_Task
<ACE_MT_SYNCH
>
17 Get_Task (ACE_Thread_Manager
*thr_mgr
,
22 const ACE_Array
<ACE_CString
>& results ();
28 ACE_Array
<ACE_CString
> results_
;
29 ACE_SYNCH_MUTEX lock_
;
32 Get_Task::Get_Task (ACE_Thread_Manager
*thr_mgr
,
34 : ACE_Task
<ACE_MT_SYNCH
> (thr_mgr
),
35 n_threads_ (n_threads
)
37 // Create worker threads.
38 if (this->activate (THR_NEW_LWP
, n_threads_
) == -1)
39 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("activate failed")));
41 ACE_DEBUG ((LM_INFO
, ACE_TEXT ("(%P|%t) started %d threads\n"), n_threads_
));
44 void Get_Task::shutdown ()
46 ACE_GUARD (ACE_SYNCH_MUTEX
,
53 ACE_DEBUG ((LM_INFO
, ACE_TEXT ("(%P|%t) ending event loop\n")));
54 ACE_Reactor::instance ()->end_event_loop ();
60 ACE_DEBUG ((LM_INFO
, ACE_TEXT ("(%P|%t) task started\n")));
62 ACE::HTTP::URL http_url
;
63 if (http_url
.parse ("http://www.remedy.nl"))
65 ACE::HTTP::ClientRequestHandler rh
;
66 ACE::INet::URLStream urlin
= http_url
.open (rh
);
69 ACE_DEBUG ((LM_INFO
, ACE_TEXT ("(%P|%t) opened URL %C\n"), http_url
.to_string ().c_str ()));
71 ACE::IOS::CString_OStream sos
;
72 sos
<< urlin
->rdbuf ();
73 ACE_DEBUG ((LM_INFO
, ACE_TEXT ("(%P|%t) result downloaded\n")));
76 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX
,
80 ACE_DEBUG ((LM_INFO
, ACE_TEXT ("(%P|%t) storing result\n")));
81 results_
.size (results_
.size () + 1);
82 results_
[results_
.size ()-1] = sos
.str ();
86 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("(%P|%t) failed to open URL %C\n"), http_url
.to_string ().c_str ()));
91 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("(%P|%t) failed to parse URL : result = %C\n"),
92 http_url
.to_string ().c_str ()));
95 ACE_DEBUG ((LM_INFO
, ACE_TEXT ("(%P|%t) task finished\n")));
100 const ACE_Array
<ACE_CString
>& Get_Task::results ()
105 class Get_MultiTask
: public ACE_Task
<ACE_MT_SYNCH
>
108 Get_MultiTask (ACE_Thread_Manager
*thr_mgr
);
115 const char* get_url ();
117 void wait_for_all ();
122 int n_thread_starts_
;
125 ACE_SYNCH_MUTEX lock_
;
126 ACE_SYNCH_CONDITION signal_
;
129 const char* urls_
[] = {
130 "http://www.theaceorb.nl",
131 "http://www.remedy.nl",
132 "http://www.google.com"
135 Get_MultiTask::Get_MultiTask (ACE_Thread_Manager
*thr_mgr
)
136 : ACE_Task
<ACE_MT_SYNCH
> (thr_mgr
),
137 n_threads_ (sizeof (urls_
)/sizeof (const char*)),
138 n_thread_starts_ (0),
140 n_fails_ (n_threads_
),
143 // Create worker threads.
144 if (this->activate (THR_NEW_LWP
, n_threads_
) == -1)
145 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"), ACE_TEXT ("activate failed")));
148 const char* Get_MultiTask::get_url ()
150 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX
,
155 const char* url
= urls_
[n_thread_starts_
];
160 void Get_MultiTask::wait_for_all ()
162 ACE_GUARD (ACE_SYNCH_MUTEX
,
168 if (n_open_urls_
== n_threads_
)
169 signal_
.broadcast (); // signal all threads that all urls are opened
171 signal_
.wait (); // wait for all urls to be opened
174 void Get_MultiTask::shutdown ()
176 ACE_GUARD (ACE_SYNCH_MUTEX
,
183 ACE_DEBUG ((LM_INFO
, ACE_TEXT ("(%P|%t) ending event loop\n")));
184 ACE_Reactor::instance ()->end_event_loop ();
188 int Get_MultiTask::svc ()
190 ACE_DEBUG ((LM_INFO
, ACE_TEXT ("(%P|%t) multitask started\n")));
192 ACE::HTTP::URL http_url
;
193 const char* url
= get_url ();
195 ACE_DEBUG ((LM_INFO
, ACE_TEXT ("(%P|%t) parsing URL %C\n"), url
));
197 if (http_url
.parse (url
))
199 ACE::HTTP::ClientRequestHandler rh
;
201 if (rh
.response_stream ())
202 ACE_DEBUG ((LM_INFO
, ACE_TEXT ("(%P|%t) opened URL %C\n"), http_url
.to_string ().c_str ()));
204 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("(%P|%t) failed to open URL %C\n"), http_url
.to_string ().c_str ()));
206 if (rh
.response_stream ())
208 ACE::IOS::CString_OStream sos
;
209 sos
<< rh
.response_stream ().rdbuf ();
210 ACE_DEBUG ((LM_INFO
, ACE_TEXT ("(%P|%t) result downloaded\n")));
213 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX
,
217 ACE_DEBUG ((LM_INFO
, ACE_TEXT ("(%P|%t) marking result\n")));
224 ACE_DEBUG ((LM_INFO
, ACE_TEXT ("(%P|%t) task finished\n")));
229 int Get_MultiTask::failures ()
237 ACE_TMAIN (int, ACE_TCHAR
*[])
239 #if defined (ACE_HAS_THREADS)
240 ACE_DEBUG ((LM_INFO
, ACE_TEXT ("(%P|%t) MGet_Test started\n")));
242 ACE_Reactor
* reactor
= ACE_Reactor::instance ();
245 ACE_DEBUG ((LM_INFO
, ACE_TEXT ("(%P|%t) starting serialized test\n")));
247 Get_Task
get_task (ACE_Thread_Manager::instance (),
250 // run event loop until ended by last task thread
251 reactor
->run_event_loop ();
253 // all threads really ended?
254 ACE_Thread_Manager::instance ()->wait ();
256 // All threads should have resulted in the same data...
257 if (get_task
.results ().size () != ACE_Utils::truncate_cast
<size_t> (n_threads
))
259 ACE_ERROR ((LM_ERROR
,
260 ACE_TEXT ("%d results found; ")
261 ACE_TEXT ("should be %d\n"),
262 get_task
.results ().size (),
267 const ACE_Array
<ACE_CString
>& results
= get_task
.results ();
272 if (results
[0] != results
[i
])
274 ACE_ERROR ((LM_ERROR
,
275 ACE_TEXT ("result %d does not match other results\n"),
282 ACE_DEBUG ((LM_INFO
, ACE_TEXT ("(%P|%t) starting parallelized test\n")));
284 reactor
->reset_event_loop ();
286 Get_MultiTask
get_multi_task (ACE_Thread_Manager::instance ());
288 // run event loop until ended by last task thread
289 reactor
->run_event_loop ();
291 // all threads really ended?
292 ACE_Thread_Manager::instance ()->wait ();
294 // All threads should have resulted in data...
295 if (get_multi_task
.failures () > 0)
297 ACE_ERROR ((LM_ERROR
,
298 ACE_TEXT ("%d failures found"),
299 get_multi_task
.failures ()));
302 ACE_DEBUG ((LM_INFO
, ACE_TEXT ("(%P|%t) MGet_Test finished\n")));
305 ACE_TEXT ("threads not supported on this platform\n")));
306 #endif /* ACE_HAS_THREADS */