Merge pull request #2309 from mitza-oci/warnings
[ACE_TAO.git] / ACE / examples / Bounded_Packet_Relay / Thread_Bounded_Packet_Relay.cpp
blob3d33608a4333a9ae80794068910746d98b84e359
2 //=============================================================================
3 /**
4 * @file Thread_Bounded_Packet_Relay.cpp
6 * Method definitions for the threaded-bounded packet relay class.
8 * @author Chris Gill <cdgill@cs.wustl.edu> and Douglas C. Schmidt <d.schmidt@vanderbilt.edu> Based on the Timer Queue Test example written by Carlos O'Ryan <coryan@cs.wustl.edu> and Douglas C. Schmidt <d.schmidt@vanderbilt.edu> and Sergio Flores-Gaitan <sergio@cs.wustl.edu>
9 */
10 //=============================================================================
13 #include "ace/OS_NS_string.h"
14 #include "ace/OS_NS_sys_time.h"
15 #include "ace/Condition_T.h"
16 #include "ace/Null_Mutex.h"
18 #include "Thread_Bounded_Packet_Relay.h"
20 typedef Thread_Bounded_Packet_Relay_Driver::MYCOMMAND DRIVER_CMD;
21 typedef ACE_Command_Callback<BPR_Handler_Base, BPR_Handler_Base::ACTION> HANDLER_CMD;
22 typedef ACE_Command_Callback<Send_Handler, Send_Handler::ACTION> SEND_HANDLER_CMD;
25 // Constructor.
27 Text_Input_Device_Wrapper::Text_Input_Device_Wrapper (ACE_Thread_Manager *input_task_mgr,
28 size_t read_length,
29 const char* text,
30 int logging)
31 : Input_Device_Wrapper_Base (input_task_mgr),
32 read_length_ (read_length),
33 text_ (text),
34 index_ (0),
35 logging_ (logging),
36 packet_count_ (0)
41 // Destructor.
43 Text_Input_Device_Wrapper::~Text_Input_Device_Wrapper ()
47 // Modifies device settings based on passed pointer to a u_long.
49 int
50 Text_Input_Device_Wrapper::modify_device_settings (void *logging)
52 packet_count_ = 0;
54 if (logging)
55 logging_ = *static_cast<int *> (logging);
56 else
57 ACE_ERROR_RETURN ((LM_ERROR,
58 "Text_Input_Device_Wrapper::modify_device_settings: "
59 "null argument"),
60 -1);
61 return 0;
65 // Creates a new message block, carrying data
66 // read from the underlying input device.
68 ACE_Message_Block *
69 Text_Input_Device_Wrapper::create_input_message ()
71 // Construct a new message block to send.
72 ACE_Message_Block *mb = 0;
73 ACE_NEW_RETURN (mb,
74 ACE_Message_Block (read_length_),
75 0);
77 // Zero out a "read" buffer to hold data.
78 char read_buf [BUFSIZ];
79 ACE_OS::memset (read_buf, 0, BUFSIZ);
81 // Loop through the text, filling in data to copy into the read
82 // buffer (leaving room for a terminating zero).
83 for (size_t i = 0; i < read_length_ - 1; ++i)
85 read_buf [i] = text_ [index_];
86 index_ = (index_ + 1) % ACE_OS::strlen (text_);
89 // Copy buf into the Message_Block and update the wr_ptr ().
90 if (mb->copy (read_buf, read_length_) < 0)
92 delete mb;
93 ACE_ERROR_RETURN ((LM_ERROR,
94 "read buffer copy failed"),
95 0);
98 // log packet creation if logging is turned on
99 if (logging_ & Text_Input_Device_Wrapper::LOG_MSGS_CREATED)
101 ++packet_count_;
102 ACE_DEBUG ((LM_DEBUG, "input message %d created\n",
103 packet_count_));
106 return mb;
109 // Constructor.
111 Text_Output_Device_Wrapper::Text_Output_Device_Wrapper (int logging)
112 : logging_ (logging)
116 // Consume and possibly print out the passed message.
119 Text_Output_Device_Wrapper::write_output_message (void *message)
121 if (message)
123 ++packet_count_;
125 if (logging_ & Text_Output_Device_Wrapper::LOG_MSGS_RCVD)
126 ACE_DEBUG ((LM_DEBUG, "output message %d received\n",
127 packet_count_));
129 if (logging_ & Text_Output_Device_Wrapper::PRINT_MSGS_RCVD)
130 ACE_DEBUG ((LM_DEBUG, "output message %d:\n[%s]\n",
131 packet_count_,
132 static_cast<ACE_Message_Block *> (message)->rd_ptr ()));
134 delete static_cast<ACE_Message_Block *> (message);
135 return 0;
137 ACE_ERROR_RETURN ((LM_ERROR,
138 "Text_Output_Device_Wrapper::"
139 "write_output_message: null argument"), -1);
142 // Modifies device settings based on passed pointer to a u_long.
145 Text_Output_Device_Wrapper::modify_device_settings (void *logging)
147 packet_count_ = 0;
149 if (logging)
150 logging_ = *static_cast<int *> (logging);
151 else
152 ACE_ERROR_RETURN ((LM_ERROR,
153 "Text_Output_Device_Wrapper::modify_device_settings: "
154 "null argument"),
155 -1);
156 return 0;
159 // Constructor.
161 User_Input_Task::User_Input_Task (Bounded_Packet_Relay *relay,
162 Thread_Timer_Queue *queue,
163 Thread_Bounded_Packet_Relay_Driver &tbprd)
164 : ACE_Task_Base (ACE_Thread_Manager::instance ()),
165 usecs_ (ACE_ONE_SECOND_IN_USECS),
166 relay_ (relay),
167 queue_ (queue),
168 driver_ (tbprd)
172 // Destructor.
174 User_Input_Task::~User_Input_Task ()
176 this->clear_all_timers ();
179 // Runs the main event loop.
182 User_Input_Task::svc ()
184 for (;;)
185 // Call back to the driver's implementation of how to read and
186 // parse input.
187 if (this->driver_.get_next_request () == -1)
188 break;
190 // We are done.
191 this->relay_->end_transmission (Bounded_Packet_Relay::CANCELLED);
192 this->queue_->deactivate ();
193 ACE_DEBUG ((LM_DEBUG,
194 "terminating user input thread\n"));
195 this->clear_all_timers ();
196 return 0;
199 // Sets the number of packets for the next transmission.
202 User_Input_Task::set_packet_count (void *argument)
204 if (argument)
206 driver_.packet_count (*static_cast<int *> (argument));
207 return 0;
209 ACE_ERROR_RETURN ((LM_ERROR,
210 "User_Input_Task::set_packet_count: null argument"),
211 -1);
214 // Sets the input device packet arrival period (usecs) for the next
215 // transmission.
218 User_Input_Task::set_arrival_period (void *argument)
220 if (argument)
222 driver_.arrival_period (*static_cast<int *> (argument));
223 return 0;
225 ACE_ERROR_RETURN ((LM_ERROR,
226 "User_Input_Task::set_arrival_period: null argument"),
227 -1);
230 // Sets the period between output device sends (usecs) for the next
231 // transmission.
234 User_Input_Task::set_send_period (void *argument)
236 if (argument)
238 driver_.send_period (*static_cast<int *> (argument));
239 return 0;
241 ACE_ERROR_RETURN ((LM_ERROR,
242 "User_Input_Task::set_send_period: null argument"),
243 -1);
246 // Sets a limit on the transmission duration (usecs).
249 User_Input_Task::set_duration_limit (void *argument)
251 if (argument)
253 driver_.duration_limit (*static_cast<int *> (argument));
254 return 0;
256 ACE_ERROR_RETURN ((LM_ERROR,
257 "User_Input_Task::set_duration_limit: null argument"),
258 -1);
261 // Sets logging level (0 or 1) for output device for the next
262 // transmission.
265 User_Input_Task::set_logging_level (void *argument)
267 if (argument)
269 driver_.logging_level (*static_cast<int *> (argument));
270 return 0;
272 ACE_ERROR_RETURN ((LM_ERROR,
273 "User_Input_Task::set_logging_level: null argument"),
274 -1);
277 // Runs the next transmission (if one is not in progress).
280 User_Input_Task::run_transmission (void *)
282 if (relay_)
284 switch (relay_->start_transmission (driver_.packet_count (),
285 driver_.arrival_period (),
286 driver_.logging_level ()))
288 case 1:
289 ACE_DEBUG ((LM_DEBUG,
290 "\nRun transmission: "
291 " Transmission already in progress\n"));
292 return 0;
293 /* NOTREACHED */
294 case 0:
296 ACE_Time_Value now = ACE_OS::gettimeofday ();
297 ACE_Time_Value send_every (0, driver_.send_period ());
298 ACE_Time_Value send_at (send_every + now);
300 Send_Handler *send_handler;
301 ACE_NEW_RETURN (send_handler,
302 Send_Handler (driver_.packet_count (),
303 send_every,
304 *relay_,
305 *queue_,
306 driver_),
307 -1);
308 if (queue_->schedule (send_handler, 0, send_at) < 0)
310 delete send_handler;
311 ACE_ERROR_RETURN ((LM_ERROR,
312 "User_Input_Task::run_transmission: "
313 "failed to schedule send handler"),
314 -1);
316 if (driver_.duration_limit ())
318 ACE_Time_Value terminate_at (0, driver_.duration_limit ());
319 terminate_at += now;
321 Termination_Handler *termination_handler;
323 termination_handler =
324 new Termination_Handler (*relay_,
325 *queue_,
326 driver_);
328 if (! termination_handler)
330 this->clear_all_timers ();
331 ACE_ERROR_RETURN ((LM_ERROR,
332 "User_Input_Task::run_transmission: "
333 "failed to allocate termination "
334 "handler"),
335 -1);
337 if (queue_->schedule (termination_handler,
338 0, terminate_at) < 0)
340 delete termination_handler;
341 this->clear_all_timers ();
342 ACE_ERROR_RETURN ((LM_ERROR,
343 "User_Input_Task::run_transmission: "
344 "failed to schedule termination "
345 "handler"),
346 -1);
349 return 0;
351 /* NOTREACHED */
352 default:
353 return -1;
354 /* NOTREACHED */
357 ACE_ERROR_RETURN ((LM_ERROR,
358 "User_Input_Task::run_transmission: "
359 "relay not instantiated"),
360 -1);
363 // Ends the current transmission (if one is in progress).
366 User_Input_Task::end_transmission (void *)
368 if (relay_)
370 switch (relay_->end_transmission (Bounded_Packet_Relay::CANCELLED))
372 case 1:
373 ACE_DEBUG ((LM_DEBUG,
374 "\nEnd transmission: "
375 "no transmission in progress\n"));
376 ACE_FALLTHROUGH;
377 case 0:
378 // Cancel any remaining timers.
379 this->clear_all_timers ();
380 return 0;
381 /* NOTREACHED */
382 default:
383 return -1;
384 /* NOTREACHED */
387 ACE_ERROR_RETURN ((LM_ERROR,
388 "User_Input_Task::end_transmission: "
389 "relay not instantiated"),
390 -1);
393 // Reports statistics for the previous transmission
394 // (if one is not in progress).
397 User_Input_Task::report_stats (void *)
399 if (relay_)
401 switch (relay_->report_statistics ())
403 case 1:
404 ACE_DEBUG ((LM_DEBUG,
405 "\nRun transmission: "
406 "\ntransmission already in progress\n"));
407 return 0;
408 /* NOTREACHED */
410 case 0:
411 this->clear_all_timers ();
412 return 0;
413 /* NOTREACHED */
415 default:
416 return -1;
417 /* NOTREACHED */
420 ACE_ERROR_RETURN ((LM_ERROR,
421 "User_Input_Task::report_stats: "
422 "relay not instantiated"),
423 -1);
426 // Shut down the task.
429 User_Input_Task::shutdown (void *)
431 // Clear any outstanding timers.
432 this->clear_all_timers ();
434 #if !defined (ACE_LACKS_PTHREAD_CANCEL)
435 // Cancel the thread timer queue task "preemptively."
436 ACE_Thread::cancel (this->queue_->thr_id ());
437 #else
438 // Cancel the thread timer queue task "voluntarily."
439 this->queue_->deactivate ();
440 #endif /* ACE_LACKS_PTHREAD_CANCEL */
442 // -1 indicates we are shutting down the application.
443 return -1;
446 // Helper method: clears all timers.
449 User_Input_Task::clear_all_timers ()
451 // loop through the timers in the queue, cancelling each one
452 for (ACE_Timer_Node_T <ACE_Event_Handler *> *node;
453 (node = queue_->timer_queue ()->get_first ()) != 0;
455 queue_->timer_queue ()->cancel (node->get_timer_id (), 0, 0);
457 return 0;
460 // Constructor.
462 BPR_Handler_Base::BPR_Handler_Base (Bounded_Packet_Relay &relay,
463 Thread_Timer_Queue &queue)
464 : relay_ (relay),
465 queue_ (queue)
469 // Destructor.
471 BPR_Handler_Base::~BPR_Handler_Base ()
475 // Helper method: clears all timers.
478 BPR_Handler_Base::clear_all_timers (void *)
480 // Loop through the timers in the queue, cancelling each one.
482 for (ACE_Timer_Node_T <ACE_Event_Handler *> *node;
483 (node = queue_.timer_queue ()->get_first ()) != 0;
485 queue_.timer_queue ()->cancel (node->get_timer_id (), 0, 0);
486 // queue_.cancel (node->get_timer_id (), 0);
488 // Invoke the handler's (virtual) destructor
489 delete this;
491 return 0;
494 // Constructor.
496 Send_Handler::Send_Handler (u_long send_count,
497 const ACE_Time_Value &duration,
498 Bounded_Packet_Relay &relay,
499 Thread_Timer_Queue &queue,
500 Thread_Bounded_Packet_Relay_Driver &driver)
501 : BPR_Handler_Base (relay, queue),
502 send_count_ (send_count),
503 duration_ (duration),
504 driver_ (driver)
508 // Destructor.
510 Send_Handler::~Send_Handler ()
514 // Call back hook.
517 Send_Handler::handle_timeout (const ACE_Time_Value &,
518 const void *)
520 switch (relay_.send_input ())
522 case 0:
523 // Decrement count of packets to relay.
524 --send_count_;
525 ACE_FALLTHROUGH;
526 case 1:
527 if (send_count_ > 0)
529 // Enqueue a deferred callback to the reregister command.
530 SEND_HANDLER_CMD *re_register_callback_;
531 ACE_NEW_RETURN (re_register_callback_,
532 SEND_HANDLER_CMD (*this,
533 &Send_Handler::reregister),
534 -1);
535 return queue_.enqueue_command (re_register_callback_);
537 else
539 // All packets are sent, time to end the transmission, redisplay
540 // the user menu, cancel any other timers, and go away.
541 relay_.end_transmission (Bounded_Packet_Relay::COMPLETED);
542 driver_.display_menu ();
544 // Enqueue a deferred callback to the clear_all_timers command.
545 HANDLER_CMD *clear_timers_callback_;
546 ACE_NEW_RETURN (clear_timers_callback_,
547 HANDLER_CMD (*this,
548 &BPR_Handler_Base::clear_all_timers),
549 -1);
550 return queue_.enqueue_command (clear_timers_callback_);
552 /* NOTREACHED */
553 default:
554 return -1;
558 // Cancellation hook.
561 Send_Handler::cancelled ()
563 delete this;
564 return 0;
567 // Helper method: re-registers this timer
570 Send_Handler::reregister (void *)
572 // Re-register the handler for a new timeout.
573 if (queue_.schedule (this,
575 duration_ + ACE_OS::gettimeofday ()) < 0)
576 ACE_ERROR_RETURN ((LM_ERROR,
577 "Send_Handler::reregister: "
578 "failed to reschedule send handler"),
579 -1);
581 return 0;
585 // Constructor.
587 Termination_Handler::Termination_Handler (Bounded_Packet_Relay &relay,
588 Thread_Timer_Queue &queue,
589 Thread_Bounded_Packet_Relay_Driver &driver)
590 : BPR_Handler_Base (relay, queue),
591 driver_ (driver)
595 // Destructor.
597 Termination_Handler::~Termination_Handler ()
601 // Call back hook.
604 Termination_Handler::handle_timeout (const ACE_Time_Value &,
605 const void *)
607 // Transmission timed out, so end the transmission, display the user
608 // menu, and register a callback to clear the timer queue and then
609 // make this object go away.
610 relay_.end_transmission (Bounded_Packet_Relay::TIMED_OUT);
611 driver_.display_menu ();
613 // Enqueue a deferred callback to the clear_all_timers command.
614 HANDLER_CMD *clear_timers_callback_;
615 ACE_NEW_RETURN (clear_timers_callback_,
616 HANDLER_CMD (*this,
617 &BPR_Handler_Base::clear_all_timers),
618 -1);
619 return queue_.enqueue_command (clear_timers_callback_);
622 // Cancellation hook
625 Termination_Handler::cancelled ()
627 delete this;
628 return 0;
631 // Constructor.
633 Thread_Bounded_Packet_Relay_Driver::Thread_Bounded_Packet_Relay_Driver (Bounded_Packet_Relay *relay)
634 : input_task_ (relay, &timer_queue_, *this)
638 // Destructor.
640 Thread_Bounded_Packet_Relay_Driver::~Thread_Bounded_Packet_Relay_Driver ()
644 // Display the user menu.
647 Thread_Bounded_Packet_Relay_Driver::display_menu ()
649 static char menu[] =
650 "\n\n Options:\n"
651 " ----------------------------------------------------------------------\n"
652 " 1 <number of packets to relay in one transmission = %d>\n"
653 " min = 1 packet.\n"
654 " 2 <input packet arrival period (in usec) = %d>\n"
655 " min = 1.\n"
656 " 3 <output packet transmission period (in usec) = %d>\n"
657 " min = 1.\n"
658 " 4 <limit on duration of transmission (in usec) = %d>\n"
659 " min = 1, no limit = 0.\n"
660 " 5 <logging level flags = %d>\n"
661 " no logging = 0,\n"
662 " log packets created by input device = 1,\n"
663 " log packets consumed by output device = 2,\n"
664 " logging options 1,2 = 3,\n"
665 " print contents of packets consumed by output put device = 4,\n"
666 " logging options 1,4 = 5,\n"
667 " logging options 2,4 = 6,\n"
668 " logging options 1,2,4 = 7.\n"
669 " ----------------------------------------------------------------------\n"
670 " 6 - runs a transmission using the current settings\n"
671 " 7 - cancels a transmission (if there is one running)\n"
672 " 8 - reports statistics from the most recent transmission\n"
673 " 9 - quits the program\n"
674 " ----------------------------------------------------------------------\n"
675 " Please enter your choice: ";
677 ACE_DEBUG ((LM_DEBUG,
678 menu,
679 this->packet_count (),
680 this->arrival_period (),
681 this->send_period (),
682 this->duration_limit (),
683 this->logging_level ()));
685 return 0;
688 // Initialize the driver.
691 Thread_Bounded_Packet_Relay_Driver::init ()
693 // Initialize the <Command> objects with their corresponding
694 // methods from <User_Input_Task>.
695 ACE_NEW_RETURN (packet_count_cmd_,
696 DRIVER_CMD (input_task_,
697 &User_Input_Task::set_packet_count),
698 -1);
699 ACE_NEW_RETURN (arrival_period_cmd_,
700 DRIVER_CMD (input_task_,
701 &User_Input_Task::set_arrival_period),
702 -1);
703 ACE_NEW_RETURN (transmit_period_cmd_,
704 DRIVER_CMD (input_task_,
705 &User_Input_Task::set_send_period),
706 -1);
707 ACE_NEW_RETURN (duration_limit_cmd_,
708 DRIVER_CMD (input_task_,
709 &User_Input_Task::set_duration_limit),
710 -1);
711 ACE_NEW_RETURN (logging_level_cmd_,
712 DRIVER_CMD (input_task_,
713 &User_Input_Task::set_logging_level),
714 -1);
715 ACE_NEW_RETURN (run_transmission_cmd_,
716 DRIVER_CMD (input_task_,
717 &User_Input_Task::run_transmission),
718 -1);
719 ACE_NEW_RETURN (cancel_transmission_cmd_,
720 DRIVER_CMD (input_task_,
721 &User_Input_Task::end_transmission),
722 -1);
723 ACE_NEW_RETURN (report_stats_cmd_,
724 DRIVER_CMD (input_task_,
725 &User_Input_Task::report_stats),
726 -1);
727 ACE_NEW_RETURN (shutdown_cmd_,
728 DRIVER_CMD (input_task_,
729 &User_Input_Task::shutdown),
730 -1);
731 if (this->input_task_.activate () == -1)
732 ACE_ERROR_RETURN ((LM_ERROR,
733 "cannot activate input task"),
734 -1);
735 else if (this->timer_queue_.activate () == -1)
736 ACE_ERROR_RETURN ((LM_ERROR,
737 "cannot activate timer queue"),
738 -1);
739 else if (ACE_Thread_Manager::instance ()->wait () == -1)
740 ACE_ERROR_RETURN ((LM_ERROR,
741 "wait on Thread_Manager failed"),
742 -1);
743 return 0;
746 // Run the driver
749 Thread_Bounded_Packet_Relay_Driver::run ()
751 this->init ();
752 return 0;