Merge branch 'master' into jwi-bcc64xsingletonwarning
[ACE_TAO.git] / ACE / examples / Threads / future2.cpp
blobd883c747420f067c9eac35b73479dedd5a7c9d46
2 //=============================================================================
3 /**
4 * @file future2.cpp
6 * This example tests the ACE Future.
8 * @author Andres Kruse <Andres.Kruse@cern.ch> and Douglas C. Schmidt <d.schmidt@vanderbilt.edu> and "Method_Request_work". - make the methods "work_i" and "name_i" private the tests so they are more modular.
9 */
10 //=============================================================================
13 #include "ace/OS_NS_string.h"
14 #include "ace/OS_NS_sys_time.h"
15 #include "ace/OS_NS_unistd.h"
16 #include "ace/OS_main.h"
17 #include "ace/ACE.h"
18 #include "ace/Task.h"
19 #include "ace/Message_Queue.h"
20 #include "ace/Future.h"
21 #include "ace/Method_Request.h"
22 #include "ace/Activation_Queue.h"
23 #include "ace/Atomic_Op.h"
24 #include <memory>
26 #if defined (ACE_HAS_THREADS)
28 typedef ACE_Atomic_Op<ACE_Thread_Mutex, int> ATOMIC_INT;
30 // a counter for the tasks..
31 static ATOMIC_INT scheduler_open_count (0);
33 // forward declarations
34 class Method_Request_work;
35 class Method_Request_name;
37 /**
38 * @class Scheduler
40 * @brief Active Object Scheduler.
42 class Scheduler : public ACE_Task_Base
44 // Every method object has to be able to access the private methods.
46 friend class Method_Request_work;
47 friend class Method_Request_name;
48 friend class Method_Request_end;
49 public:
50 Scheduler (const char *, Scheduler * = 0);
51 virtual ~Scheduler ();
53 //FUZZ: disable check_for_lack_ACE_OS
54 /// The method that is used to start the active object.
55 ///FUZZ: enable check_for_lack_ACE_OS
56 virtual int open (void *args = 0);
58 // = Here are the methods exported by the class. They return an
59 // <ACE_Future>.
60 ACE_Future<u_long> work (u_long param, int count = 1);
61 ACE_Future<char*> name ();
62 void end ();
64 private:
65 //FUZZ: disable check_for_lack_ACE_OS
66 /// Should not be accessible from outside... (use end () instead).
67 ///FUZZ: enable check_for_lack_ACE_OS
68 virtual int close (u_long flags = 0);
70 /// Here the actual servicing of all requests is happening..
71 virtual int svc ();
73 // = Implementation methods.
74 u_long work_i (u_long, int);
75 char *name_i ();
77 char *name_;
78 ACE_Activation_Queue activation_queue_;
79 Scheduler *scheduler_;
82 /**
83 * @class Method_Request_work
85 * @brief Reification of the <work> method.
87 class Method_Request_work : public ACE_Method_Request
89 public:
90 Method_Request_work (Scheduler *, u_long, int, ACE_Future<u_long> &);
91 virtual ~Method_Request_work ();
92 virtual int call ();
94 private:
95 Scheduler *scheduler_;
96 u_long param_;
97 int count_;
98 ACE_Future<u_long> future_result_;
101 Method_Request_work::Method_Request_work (Scheduler* new_Scheduler,
102 u_long new_param,
103 int new_count,
104 ACE_Future<u_long> &new_result)
105 : scheduler_ (new_Scheduler),
106 param_ (new_param),
107 count_ (new_count),
108 future_result_ (new_result)
112 Method_Request_work::~Method_Request_work ()
117 Method_Request_work::call ()
119 return this->future_result_.set (this->scheduler_->work_i (this->param_, this->count_));
123 * @class Method_Request_name
125 * @brief Reification of the <name> method.
127 class Method_Request_name : public ACE_Method_Request
129 public:
130 Method_Request_name (Scheduler *, ACE_Future<char*> &);
131 virtual ~Method_Request_name ();
132 virtual int call ();
134 private:
135 Scheduler *scheduler_;
136 ACE_Future<char*> future_result_;
140 Method_Request_name::Method_Request_name (Scheduler *new_scheduler,
141 ACE_Future<char*> &new_result)
142 : scheduler_ (new_scheduler),
143 future_result_ (new_result)
145 ACE_DEBUG ((LM_DEBUG,
146 " (%t) Method_Request_name created\n"));
149 Method_Request_name::~Method_Request_name ()
151 ACE_DEBUG ((LM_DEBUG,
152 " (%t) Method_Request_name will be deleted.\n"));
156 Method_Request_name::call ()
158 return future_result_.set (scheduler_->name_i ());
162 * @class Method_Request_end
164 * @brief Reification of the <end> method.
166 class Method_Request_end : public ACE_Method_Request
168 public:
169 Method_Request_end (Scheduler *new_Scheduler): scheduler_ (new_Scheduler) {}
170 virtual ~Method_Request_end () {}
171 virtual int call () { return -1; }
173 private:
174 /// Keep track of our scheduler.
175 Scheduler *scheduler_;
178 // constructor
179 Scheduler::Scheduler (const char *newname, Scheduler *new_Scheduler)
181 ACE_NEW (this->name_, char[ACE_OS::strlen (newname) + 1]);
182 ACE_OS::strcpy ((char *) this->name_, newname);
183 this->scheduler_ = new_Scheduler;
184 ACE_DEBUG ((LM_DEBUG, " (%t) Scheduler %s created\n", this->name_));
187 // Destructor
188 Scheduler::~Scheduler ()
190 ACE_DEBUG ((LM_DEBUG, " (%t) Scheduler %s will be destroyed\n", this->name_));
194 Scheduler::open (void *)
196 scheduler_open_count++;
197 ACE_DEBUG ((LM_DEBUG, " (%t) Scheduler %s open\n", this->name_));
198 return this->activate (THR_BOUND);
202 Scheduler::close (u_long)
204 ACE_DEBUG ((LM_DEBUG, " (%t) Scheduler %s close\n", this->name_));
205 scheduler_open_count--;
206 return 0;
210 Scheduler::svc ()
212 // Main event loop for this active object.
213 for (;;)
215 // Dequeue the next method object (we use an unique pointer in
216 // case an exception is thrown in the <call>).
217 std::unique_ptr<ACE_Method_Request> mo (this->activation_queue_.dequeue ());
219 ACE_DEBUG ((LM_DEBUG, " (%t) calling method object\n"));
220 // Call it.
221 if (mo->call () == -1)
222 break;
223 // Smart pointer destructor automatically deletes mo.
226 /* NOTREACHED */
227 return 0;
230 void
231 Scheduler::end ()
233 this->activation_queue_.enqueue (new Method_Request_end (this));
236 // Here's where the Work takes place.
237 u_long
238 Scheduler::work_i (u_long param,
239 int count)
241 ACE_UNUSED_ARG (count);
243 return ACE::is_prime (param, 2, param / 2);
246 char *
247 Scheduler::name_i ()
249 char *the_name;
251 ACE_NEW_RETURN (the_name, char[ACE_OS::strlen (this->name_) + 1], 0);
252 ACE_OS::strcpy (the_name, this->name_);
254 return the_name;
257 ACE_Future<char *>
258 Scheduler::name ()
260 if (this->scheduler_)
261 // Delegate to the other scheduler
262 return this->scheduler_->name ();
263 else
265 ACE_Future<char*> new_future;
267 if (this->thr_count () == 0)
269 // This scheduler is inactive... so we execute the user
270 // request right away...
271 std::unique_ptr<ACE_Method_Request> mo (new Method_Request_name (this, new_future));
273 mo->call ();
274 // Smart pointer destructor automatically deletes mo.
276 else
277 // @@ What happens if new fails here?
278 this->activation_queue_.enqueue
279 (new Method_Request_name (this, new_future));
281 return new_future;
285 ACE_Future<u_long>
286 Scheduler::work (u_long newparam, int newcount)
288 if (this->scheduler_)
289 return this->scheduler_->work (newparam, newcount);
290 else
292 ACE_Future<u_long> new_future;
294 if (this->thr_count () == 0)
296 std::unique_ptr<ACE_Method_Request> mo
297 (new Method_Request_work (this, newparam, newcount, new_future));
298 mo->call ();
299 // Smart pointer destructor automatically deletes it.
301 else
302 this->activation_queue_.enqueue
303 (new Method_Request_work (this, newparam, newcount, new_future));
305 return new_future;
309 static int
310 determine_iterations ()
312 int n_iterations;
314 ACE_DEBUG ((LM_DEBUG," (%t) determining the number of iterations...\n"));
315 Scheduler *worker_a;
317 ACE_NEW_RETURN (worker_a, Scheduler ("worker A"), -1);
319 ACE_Time_Value tstart (ACE_OS::gettimeofday ());
320 ACE_Time_Value tend (ACE_OS::gettimeofday ());
322 // Determine the number of iterations... we want so many that the
323 // work () takes about 1 second...
325 for (n_iterations = 1;
326 (tend.sec () - tstart.sec ()) < 1;
327 n_iterations *= 2)
329 tstart = ACE_OS::gettimeofday ();
331 worker_a->work (9013, n_iterations);
333 tend = ACE_OS::gettimeofday ();
336 ACE_DEBUG ((LM_DEBUG," (%t) n_iterations %d\n",
337 (int) n_iterations));
339 worker_a->end ();
340 // @@ Can we safely delete worker_a here?
341 return n_iterations;
344 static void
345 test_active_object (int n_iterations)
347 ACE_UNUSED_ARG (n_iterations);
349 ACE_DEBUG ((LM_DEBUG," (%t) testing active object pattern...\n"));
350 // A simple example for the use of the active object pattern and
351 // futures to return values from an active object.
353 Scheduler *worker_a;
354 Scheduler *worker_b;
355 Scheduler *worker_c;
357 ACE_NEW (worker_a, Scheduler ("worker A"));
358 ACE_NEW (worker_b, Scheduler ("worker B"));
359 // Have worker_c delegate his work to worker_a.
360 ACE_NEW (worker_c, Scheduler ("worker C", worker_a));
362 // loop 0:
363 // test the Schedulers when they are not active.
364 // now the method objects will be created but since
365 // there is no active thread they will also be
366 // immediately executed, in the "main" thread.
367 // loop 1:
368 // do the same test but with the schedulers
369 // activated
370 for (int i = 0; i < 2; i++)
372 if (i == 1)
374 worker_a->open ();
375 worker_b->open ();
376 worker_c->open ();
379 ACE_Future<u_long> fresulta = worker_a->work (9013);
380 ACE_Future<u_long> fresultb = worker_b->work (9013);
381 ACE_Future<u_long> fresultc = worker_c->work (9013);
383 if (i == 0)
385 if (!fresulta.ready ())
386 ACE_DEBUG ((LM_DEBUG," (%t) ERROR: worker A is should be ready!!!\n"));
387 if (!fresultb.ready ())
388 ACE_DEBUG ((LM_DEBUG," (%t) ERROR: worker B is should be ready!!!\n"));
389 if (!fresultc.ready ())
390 ACE_DEBUG ((LM_DEBUG," (%t) ERROR: worker C is should be ready!!!\n"));
393 // When the workers are active we will block here until the
394 // results are available.
396 u_long resulta = fresulta;
397 u_long resultb = fresultb;
398 u_long resultc = fresultc;
400 ACE_Future<char *> fnamea = worker_a->name ();
401 ACE_Future<char *> fnameb = worker_b->name ();
402 ACE_Future<char *> fnamec = worker_c->name ();
404 char *namea = fnamea;
405 char *nameb = fnameb;
406 char *namec = fnamec;
408 ACE_DEBUG ((LM_DEBUG, " (%t) result from %s %u\n",
409 namea, (u_int) resulta));
410 ACE_DEBUG ((LM_DEBUG, " (%t) result from %s %u\n",
411 nameb, (u_int) resultb));
412 ACE_DEBUG ((LM_DEBUG, " (%t) result from %s %u\n",
413 namec, (u_int) resultc));
416 ACE_DEBUG ((LM_DEBUG, " (%t) scheduler_open_count %d before end ()\n",
417 scheduler_open_count.value ()));
419 worker_a->end ();
420 worker_b->end ();
421 worker_c->end ();
423 ACE_DEBUG ((LM_DEBUG, " (%t) scheduler_open_count %d immediately after end ()\n",
424 scheduler_open_count.value ()));
426 ACE_OS::sleep (2);
428 ACE_DEBUG ((LM_DEBUG, " (%t) scheduler_open_count %d after waiting\n",
429 scheduler_open_count.value ()));
430 // @@ Can we safely delete worker_a, worker_b, and worker_c?
433 static void
434 test_cancellation (int n_iterations)
436 ACE_DEBUG ((LM_DEBUG," (%t) testing cancellation of a future...\n"));
438 // Now test the cancelling a future.
440 Scheduler *worker_a;
441 ACE_NEW (worker_a, Scheduler ("worker A"));
442 worker_a->open ();
444 ACE_Future<u_long> fresulta = worker_a->work (9013, n_iterations);
446 // save the result by copying the future
447 ACE_Future<u_long> fresultb = fresulta;
449 // now we cancel the first future.. but the
450 // calculation will still go on...
451 fresulta.cancel (10);
453 if (!fresulta.ready ())
454 ACE_DEBUG ((LM_DEBUG," (%t) ERROR: future A is should be ready!!!\n"));
456 u_long resulta = fresulta;
458 ACE_DEBUG ((LM_DEBUG, " (%t) cancelled result %u\n", (u_int) resulta));
460 if (resulta != 10)
461 ACE_DEBUG ((LM_DEBUG, " (%t) cancelled result should be 10!!\n", resulta));
463 resulta = fresultb;
465 ACE_DEBUG ((LM_DEBUG, " (%t) true result %u\n", (u_int) resulta));
467 worker_a->end ();
468 // @@ Can we safely delete worker_a here?
471 static void
472 test_timeout (int n_iterations)
474 ACE_DEBUG ((LM_DEBUG," (%t) testing timeout on waiting for the result...\n"));
475 Scheduler *worker_a;
476 ACE_NEW (worker_a, Scheduler ("worker A"));
477 worker_a->open ();
479 ACE_Future<u_long> fresulta = worker_a->work (9013, 2 * n_iterations);
481 // Should immediately return... and we should see an error...
482 ACE_Time_Value *delay;
483 ACE_NEW (delay, ACE_Time_Value (1));
485 u_long resulta = 0;
486 fresulta.get (resulta, delay);
488 if (fresulta.ready ())
489 ACE_DEBUG ((LM_DEBUG," (%t) ERROR: future A is should not be ready!!!\n"));
490 else
491 ACE_DEBUG ((LM_DEBUG," (%t) timed out on future A\n"));
493 // now we wait until we are done...
494 fresulta.get (resulta);
495 ACE_DEBUG ((LM_DEBUG, " (%t) result %u\n", (u_int) resulta));
497 worker_a->end ();
498 // @@ Can we safely delete worker_a here?
502 ACE_TMAIN (int, ACE_TCHAR *[])
504 int n_iterations = determine_iterations ();
506 test_active_object (n_iterations);
507 test_cancellation (n_iterations);
508 test_timeout (n_iterations);
510 ACE_DEBUG ((LM_DEBUG," (%t) that's all folks!\n"));
512 ACE_OS::sleep (5);
513 return 0;
516 #else
518 ACE_TMAIN (int, ACE_TCHAR *[])
520 ACE_ERROR ((LM_ERROR, "threads not supported on this platform\n"));
521 return 0;
523 #endif /* ACE_HAS_THREADS */