2 //=============================================================================
6 * This example tests the ACE Future.
8 * @author Andres Kruse <Andres.Kruse@cern.ch> and Douglas C. Schmidt <d.schmidt@vanderbilt.edu> and "Method_Request_work". - make the methods "work_i" and "name_i" private the tests so they are more modular.
10 //=============================================================================
13 #include "ace/OS_NS_string.h"
14 #include "ace/OS_NS_sys_time.h"
15 #include "ace/OS_NS_unistd.h"
16 #include "ace/OS_main.h"
19 #include "ace/Message_Queue.h"
20 #include "ace/Future.h"
21 #include "ace/Method_Request.h"
22 #include "ace/Activation_Queue.h"
23 #include "ace/Atomic_Op.h"
26 #if defined (ACE_HAS_THREADS)
28 typedef ACE_Atomic_Op
<ACE_Thread_Mutex
, int> ATOMIC_INT
;
30 // a counter for the tasks..
31 static ATOMIC_INT
scheduler_open_count (0);
33 // forward declarations
34 class Method_Request_work
;
35 class Method_Request_name
;
40 * @brief Active Object Scheduler.
42 class Scheduler
: public ACE_Task_Base
44 // Every method object has to be able to access the private methods.
46 friend class Method_Request_work
;
47 friend class Method_Request_name
;
48 friend class Method_Request_end
;
50 Scheduler (const char *, Scheduler
* = 0);
51 virtual ~Scheduler ();
53 //FUZZ: disable check_for_lack_ACE_OS
54 /// The method that is used to start the active object.
55 ///FUZZ: enable check_for_lack_ACE_OS
56 virtual int open (void *args
= 0);
58 // = Here are the methods exported by the class. They return an
60 ACE_Future
<u_long
> work (u_long param
, int count
= 1);
61 ACE_Future
<char*> name ();
65 //FUZZ: disable check_for_lack_ACE_OS
66 /// Should not be accessible from outside... (use end () instead).
67 ///FUZZ: enable check_for_lack_ACE_OS
68 virtual int close (u_long flags
= 0);
70 /// Here the actual servicing of all requests is happening..
73 // = Implementation methods.
74 u_long
work_i (u_long
, int);
78 ACE_Activation_Queue activation_queue_
;
79 Scheduler
*scheduler_
;
83 * @class Method_Request_work
85 * @brief Reification of the <work> method.
87 class Method_Request_work
: public ACE_Method_Request
90 Method_Request_work (Scheduler
*, u_long
, int, ACE_Future
<u_long
> &);
91 virtual ~Method_Request_work ();
95 Scheduler
*scheduler_
;
98 ACE_Future
<u_long
> future_result_
;
101 Method_Request_work::Method_Request_work (Scheduler
* new_Scheduler
,
104 ACE_Future
<u_long
> &new_result
)
105 : scheduler_ (new_Scheduler
),
108 future_result_ (new_result
)
112 Method_Request_work::~Method_Request_work ()
117 Method_Request_work::call ()
119 return this->future_result_
.set (this->scheduler_
->work_i (this->param_
, this->count_
));
123 * @class Method_Request_name
125 * @brief Reification of the <name> method.
127 class Method_Request_name
: public ACE_Method_Request
130 Method_Request_name (Scheduler
*, ACE_Future
<char*> &);
131 virtual ~Method_Request_name ();
135 Scheduler
*scheduler_
;
136 ACE_Future
<char*> future_result_
;
140 Method_Request_name::Method_Request_name (Scheduler
*new_scheduler
,
141 ACE_Future
<char*> &new_result
)
142 : scheduler_ (new_scheduler
),
143 future_result_ (new_result
)
145 ACE_DEBUG ((LM_DEBUG
,
146 " (%t) Method_Request_name created\n"));
149 Method_Request_name::~Method_Request_name ()
151 ACE_DEBUG ((LM_DEBUG
,
152 " (%t) Method_Request_name will be deleted.\n"));
156 Method_Request_name::call ()
158 return future_result_
.set (scheduler_
->name_i ());
162 * @class Method_Request_end
164 * @brief Reification of the <end> method.
166 class Method_Request_end
: public ACE_Method_Request
169 Method_Request_end (Scheduler
*new_Scheduler
): scheduler_ (new_Scheduler
) {}
170 virtual ~Method_Request_end () {}
171 virtual int call () { return -1; }
174 /// Keep track of our scheduler.
175 Scheduler
*scheduler_
;
179 Scheduler::Scheduler (const char *newname
, Scheduler
*new_Scheduler
)
181 ACE_NEW (this->name_
, char[ACE_OS::strlen (newname
) + 1]);
182 ACE_OS::strcpy ((char *) this->name_
, newname
);
183 this->scheduler_
= new_Scheduler
;
184 ACE_DEBUG ((LM_DEBUG
, " (%t) Scheduler %s created\n", this->name_
));
188 Scheduler::~Scheduler ()
190 ACE_DEBUG ((LM_DEBUG
, " (%t) Scheduler %s will be destroyed\n", this->name_
));
194 Scheduler::open (void *)
196 scheduler_open_count
++;
197 ACE_DEBUG ((LM_DEBUG
, " (%t) Scheduler %s open\n", this->name_
));
198 return this->activate (THR_BOUND
);
202 Scheduler::close (u_long
)
204 ACE_DEBUG ((LM_DEBUG
, " (%t) Scheduler %s close\n", this->name_
));
205 scheduler_open_count
--;
212 // Main event loop for this active object.
215 // Dequeue the next method object (we use an unique pointer in
216 // case an exception is thrown in the <call>).
217 std::unique_ptr
<ACE_Method_Request
> mo (this->activation_queue_
.dequeue ());
219 ACE_DEBUG ((LM_DEBUG
, " (%t) calling method object\n"));
221 if (mo
->call () == -1)
223 // Smart pointer destructor automatically deletes mo.
233 this->activation_queue_
.enqueue (new Method_Request_end (this));
236 // Here's where the Work takes place.
238 Scheduler::work_i (u_long param
,
241 ACE_UNUSED_ARG (count
);
243 return ACE::is_prime (param
, 2, param
/ 2);
251 ACE_NEW_RETURN (the_name
, char[ACE_OS::strlen (this->name_
) + 1], 0);
252 ACE_OS::strcpy (the_name
, this->name_
);
260 if (this->scheduler_
)
261 // Delegate to the other scheduler
262 return this->scheduler_
->name ();
265 ACE_Future
<char*> new_future
;
267 if (this->thr_count () == 0)
269 // This scheduler is inactive... so we execute the user
270 // request right away...
271 std::unique_ptr
<ACE_Method_Request
> mo (new Method_Request_name (this, new_future
));
274 // Smart pointer destructor automatically deletes mo.
277 // @@ What happens if new fails here?
278 this->activation_queue_
.enqueue
279 (new Method_Request_name (this, new_future
));
286 Scheduler::work (u_long newparam
, int newcount
)
288 if (this->scheduler_
)
289 return this->scheduler_
->work (newparam
, newcount
);
292 ACE_Future
<u_long
> new_future
;
294 if (this->thr_count () == 0)
296 std::unique_ptr
<ACE_Method_Request
> mo
297 (new Method_Request_work (this, newparam
, newcount
, new_future
));
299 // Smart pointer destructor automatically deletes it.
302 this->activation_queue_
.enqueue
303 (new Method_Request_work (this, newparam
, newcount
, new_future
));
310 determine_iterations ()
314 ACE_DEBUG ((LM_DEBUG
," (%t) determining the number of iterations...\n"));
317 ACE_NEW_RETURN (worker_a
, Scheduler ("worker A"), -1);
319 ACE_Time_Value
tstart (ACE_OS::gettimeofday ());
320 ACE_Time_Value
tend (ACE_OS::gettimeofday ());
322 // Determine the number of iterations... we want so many that the
323 // work () takes about 1 second...
325 for (n_iterations
= 1;
326 (tend
.sec () - tstart
.sec ()) < 1;
329 tstart
= ACE_OS::gettimeofday ();
331 worker_a
->work (9013, n_iterations
);
333 tend
= ACE_OS::gettimeofday ();
336 ACE_DEBUG ((LM_DEBUG
," (%t) n_iterations %d\n",
337 (int) n_iterations
));
340 // @@ Can we safely delete worker_a here?
345 test_active_object (int n_iterations
)
347 ACE_UNUSED_ARG (n_iterations
);
349 ACE_DEBUG ((LM_DEBUG
," (%t) testing active object pattern...\n"));
350 // A simple example for the use of the active object pattern and
351 // futures to return values from an active object.
357 ACE_NEW (worker_a
, Scheduler ("worker A"));
358 ACE_NEW (worker_b
, Scheduler ("worker B"));
359 // Have worker_c delegate his work to worker_a.
360 ACE_NEW (worker_c
, Scheduler ("worker C", worker_a
));
363 // test the Schedulers when they are not active.
364 // now the method objects will be created but since
365 // there is no active thread they will also be
366 // immediately executed, in the "main" thread.
368 // do the same test but with the schedulers
370 for (int i
= 0; i
< 2; i
++)
379 ACE_Future
<u_long
> fresulta
= worker_a
->work (9013);
380 ACE_Future
<u_long
> fresultb
= worker_b
->work (9013);
381 ACE_Future
<u_long
> fresultc
= worker_c
->work (9013);
385 if (!fresulta
.ready ())
386 ACE_DEBUG ((LM_DEBUG
," (%t) ERROR: worker A is should be ready!!!\n"));
387 if (!fresultb
.ready ())
388 ACE_DEBUG ((LM_DEBUG
," (%t) ERROR: worker B is should be ready!!!\n"));
389 if (!fresultc
.ready ())
390 ACE_DEBUG ((LM_DEBUG
," (%t) ERROR: worker C is should be ready!!!\n"));
393 // When the workers are active we will block here until the
394 // results are available.
396 u_long resulta
= fresulta
;
397 u_long resultb
= fresultb
;
398 u_long resultc
= fresultc
;
400 ACE_Future
<char *> fnamea
= worker_a
->name ();
401 ACE_Future
<char *> fnameb
= worker_b
->name ();
402 ACE_Future
<char *> fnamec
= worker_c
->name ();
404 char *namea
= fnamea
;
405 char *nameb
= fnameb
;
406 char *namec
= fnamec
;
408 ACE_DEBUG ((LM_DEBUG
, " (%t) result from %s %u\n",
409 namea
, (u_int
) resulta
));
410 ACE_DEBUG ((LM_DEBUG
, " (%t) result from %s %u\n",
411 nameb
, (u_int
) resultb
));
412 ACE_DEBUG ((LM_DEBUG
, " (%t) result from %s %u\n",
413 namec
, (u_int
) resultc
));
416 ACE_DEBUG ((LM_DEBUG
, " (%t) scheduler_open_count %d before end ()\n",
417 scheduler_open_count
.value ()));
423 ACE_DEBUG ((LM_DEBUG
, " (%t) scheduler_open_count %d immediately after end ()\n",
424 scheduler_open_count
.value ()));
428 ACE_DEBUG ((LM_DEBUG
, " (%t) scheduler_open_count %d after waiting\n",
429 scheduler_open_count
.value ()));
430 // @@ Can we safely delete worker_a, worker_b, and worker_c?
434 test_cancellation (int n_iterations
)
436 ACE_DEBUG ((LM_DEBUG
," (%t) testing cancellation of a future...\n"));
438 // Now test the cancelling a future.
441 ACE_NEW (worker_a
, Scheduler ("worker A"));
444 ACE_Future
<u_long
> fresulta
= worker_a
->work (9013, n_iterations
);
446 // save the result by copying the future
447 ACE_Future
<u_long
> fresultb
= fresulta
;
449 // now we cancel the first future.. but the
450 // calculation will still go on...
451 fresulta
.cancel (10);
453 if (!fresulta
.ready ())
454 ACE_DEBUG ((LM_DEBUG
," (%t) ERROR: future A is should be ready!!!\n"));
456 u_long resulta
= fresulta
;
458 ACE_DEBUG ((LM_DEBUG
, " (%t) cancelled result %u\n", (u_int
) resulta
));
461 ACE_DEBUG ((LM_DEBUG
, " (%t) cancelled result should be 10!!\n", resulta
));
465 ACE_DEBUG ((LM_DEBUG
, " (%t) true result %u\n", (u_int
) resulta
));
468 // @@ Can we safely delete worker_a here?
472 test_timeout (int n_iterations
)
474 ACE_DEBUG ((LM_DEBUG
," (%t) testing timeout on waiting for the result...\n"));
476 ACE_NEW (worker_a
, Scheduler ("worker A"));
479 ACE_Future
<u_long
> fresulta
= worker_a
->work (9013, 2 * n_iterations
);
481 // Should immediately return... and we should see an error...
482 ACE_Time_Value
*delay
;
483 ACE_NEW (delay
, ACE_Time_Value (1));
486 fresulta
.get (resulta
, delay
);
488 if (fresulta
.ready ())
489 ACE_DEBUG ((LM_DEBUG
," (%t) ERROR: future A is should not be ready!!!\n"));
491 ACE_DEBUG ((LM_DEBUG
," (%t) timed out on future A\n"));
493 // now we wait until we are done...
494 fresulta
.get (resulta
);
495 ACE_DEBUG ((LM_DEBUG
, " (%t) result %u\n", (u_int
) resulta
));
498 // @@ Can we safely delete worker_a here?
502 ACE_TMAIN (int, ACE_TCHAR
*[])
504 int n_iterations
= determine_iterations ();
506 test_active_object (n_iterations
);
507 test_cancellation (n_iterations
);
508 test_timeout (n_iterations
);
510 ACE_DEBUG ((LM_DEBUG
," (%t) that's all folks!\n"));
518 ACE_TMAIN (int, ACE_TCHAR
*[])
520 ACE_ERROR ((LM_ERROR
, "threads not supported on this platform\n"));
523 #endif /* ACE_HAS_THREADS */