2 //=============================================================================
4 * @file Thread_Bounded_Packet_Relay.cpp
6 * Method definitions for the threaded-bounded packet relay class.
8 * @author Chris Gill <cdgill@cs.wustl.edu> and Douglas C. Schmidt <d.schmidt@vanderbilt.edu> Based on the Timer Queue Test example written by Carlos O'Ryan <coryan@cs.wustl.edu> and Douglas C. Schmidt <d.schmidt@vanderbilt.edu> and Sergio Flores-Gaitan <sergio@cs.wustl.edu>
10 //=============================================================================
13 #include "ace/OS_NS_string.h"
14 #include "ace/OS_NS_sys_time.h"
15 #include "ace/Condition_T.h"
16 #include "ace/Null_Mutex.h"
18 #include "Thread_Bounded_Packet_Relay.h"
20 typedef Thread_Bounded_Packet_Relay_Driver::MYCOMMAND DRIVER_CMD
;
21 typedef ACE_Command_Callback
<BPR_Handler_Base
, BPR_Handler_Base::ACTION
> HANDLER_CMD
;
22 typedef ACE_Command_Callback
<Send_Handler
, Send_Handler::ACTION
> SEND_HANDLER_CMD
;
27 Text_Input_Device_Wrapper::Text_Input_Device_Wrapper (ACE_Thread_Manager
*input_task_mgr
,
31 : Input_Device_Wrapper_Base (input_task_mgr
),
32 read_length_ (read_length
),
43 Text_Input_Device_Wrapper::~Text_Input_Device_Wrapper ()
47 // Modifies device settings based on passed pointer to a u_long.
50 Text_Input_Device_Wrapper::modify_device_settings (void *logging
)
55 logging_
= *static_cast<int *> (logging
);
57 ACE_ERROR_RETURN ((LM_ERROR
,
58 "Text_Input_Device_Wrapper::modify_device_settings: "
65 // Creates a new message block, carrying data
66 // read from the underlying input device.
69 Text_Input_Device_Wrapper::create_input_message ()
71 // Construct a new message block to send.
72 ACE_Message_Block
*mb
= 0;
74 ACE_Message_Block (read_length_
),
77 // Zero out a "read" buffer to hold data.
78 char read_buf
[BUFSIZ
];
79 ACE_OS::memset (read_buf
, 0, BUFSIZ
);
81 // Loop through the text, filling in data to copy into the read
82 // buffer (leaving room for a terminating zero).
83 for (size_t i
= 0; i
< read_length_
- 1; ++i
)
85 read_buf
[i
] = text_
[index_
];
86 index_
= (index_
+ 1) % ACE_OS::strlen (text_
);
89 // Copy buf into the Message_Block and update the wr_ptr ().
90 if (mb
->copy (read_buf
, read_length_
) < 0)
93 ACE_ERROR_RETURN ((LM_ERROR
,
94 "read buffer copy failed"),
98 // log packet creation if logging is turned on
99 if (logging_
& Text_Input_Device_Wrapper::LOG_MSGS_CREATED
)
102 ACE_DEBUG ((LM_DEBUG
, "input message %d created\n",
111 Text_Output_Device_Wrapper::Text_Output_Device_Wrapper (int logging
)
116 // Consume and possibly print out the passed message.
119 Text_Output_Device_Wrapper::write_output_message (void *message
)
125 if (logging_
& Text_Output_Device_Wrapper::LOG_MSGS_RCVD
)
126 ACE_DEBUG ((LM_DEBUG
, "output message %d received\n",
129 if (logging_
& Text_Output_Device_Wrapper::PRINT_MSGS_RCVD
)
130 ACE_DEBUG ((LM_DEBUG
, "output message %d:\n[%s]\n",
132 static_cast<ACE_Message_Block
*> (message
)->rd_ptr ()));
134 delete static_cast<ACE_Message_Block
*> (message
);
137 ACE_ERROR_RETURN ((LM_ERROR
,
138 "Text_Output_Device_Wrapper::"
139 "write_output_message: null argument"), -1);
142 // Modifies device settings based on passed pointer to a u_long.
145 Text_Output_Device_Wrapper::modify_device_settings (void *logging
)
150 logging_
= *static_cast<int *> (logging
);
152 ACE_ERROR_RETURN ((LM_ERROR
,
153 "Text_Output_Device_Wrapper::modify_device_settings: "
161 User_Input_Task::User_Input_Task (Bounded_Packet_Relay
*relay
,
162 Thread_Timer_Queue
*queue
,
163 Thread_Bounded_Packet_Relay_Driver
&tbprd
)
164 : ACE_Task_Base (ACE_Thread_Manager::instance ()),
165 usecs_ (ACE_ONE_SECOND_IN_USECS
),
174 User_Input_Task::~User_Input_Task ()
176 this->clear_all_timers ();
179 // Runs the main event loop.
182 User_Input_Task::svc ()
185 // Call back to the driver's implementation of how to read and
187 if (this->driver_
.get_next_request () == -1)
191 this->relay_
->end_transmission (Bounded_Packet_Relay::CANCELLED
);
192 this->queue_
->deactivate ();
193 ACE_DEBUG ((LM_DEBUG
,
194 "terminating user input thread\n"));
195 this->clear_all_timers ();
199 // Sets the number of packets for the next transmission.
202 User_Input_Task::set_packet_count (void *argument
)
206 driver_
.packet_count (*static_cast<int *> (argument
));
209 ACE_ERROR_RETURN ((LM_ERROR
,
210 "User_Input_Task::set_packet_count: null argument"),
214 // Sets the input device packet arrival period (usecs) for the next
218 User_Input_Task::set_arrival_period (void *argument
)
222 driver_
.arrival_period (*static_cast<int *> (argument
));
225 ACE_ERROR_RETURN ((LM_ERROR
,
226 "User_Input_Task::set_arrival_period: null argument"),
230 // Sets the period between output device sends (usecs) for the next
234 User_Input_Task::set_send_period (void *argument
)
238 driver_
.send_period (*static_cast<int *> (argument
));
241 ACE_ERROR_RETURN ((LM_ERROR
,
242 "User_Input_Task::set_send_period: null argument"),
246 // Sets a limit on the transmission duration (usecs).
249 User_Input_Task::set_duration_limit (void *argument
)
253 driver_
.duration_limit (*static_cast<int *> (argument
));
256 ACE_ERROR_RETURN ((LM_ERROR
,
257 "User_Input_Task::set_duration_limit: null argument"),
261 // Sets logging level (0 or 1) for output device for the next
265 User_Input_Task::set_logging_level (void *argument
)
269 driver_
.logging_level (*static_cast<int *> (argument
));
272 ACE_ERROR_RETURN ((LM_ERROR
,
273 "User_Input_Task::set_logging_level: null argument"),
277 // Runs the next transmission (if one is not in progress).
280 User_Input_Task::run_transmission (void *)
284 switch (relay_
->start_transmission (driver_
.packet_count (),
285 driver_
.arrival_period (),
286 driver_
.logging_level ()))
289 ACE_DEBUG ((LM_DEBUG
,
290 "\nRun transmission: "
291 " Transmission already in progress\n"));
296 ACE_Time_Value now
= ACE_OS::gettimeofday ();
297 ACE_Time_Value
send_every (0, driver_
.send_period ());
298 ACE_Time_Value
send_at (send_every
+ now
);
300 Send_Handler
*send_handler
;
301 ACE_NEW_RETURN (send_handler
,
302 Send_Handler (driver_
.packet_count (),
308 if (queue_
->schedule (send_handler
, 0, send_at
) < 0)
311 ACE_ERROR_RETURN ((LM_ERROR
,
312 "User_Input_Task::run_transmission: "
313 "failed to schedule send handler"),
316 if (driver_
.duration_limit ())
318 ACE_Time_Value
terminate_at (0, driver_
.duration_limit ());
321 Termination_Handler
*termination_handler
;
323 termination_handler
=
324 new Termination_Handler (*relay_
,
328 if (! termination_handler
)
330 this->clear_all_timers ();
331 ACE_ERROR_RETURN ((LM_ERROR
,
332 "User_Input_Task::run_transmission: "
333 "failed to allocate termination "
337 if (queue_
->schedule (termination_handler
,
338 0, terminate_at
) < 0)
340 delete termination_handler
;
341 this->clear_all_timers ();
342 ACE_ERROR_RETURN ((LM_ERROR
,
343 "User_Input_Task::run_transmission: "
344 "failed to schedule termination "
357 ACE_ERROR_RETURN ((LM_ERROR
,
358 "User_Input_Task::run_transmission: "
359 "relay not instantiated"),
363 // Ends the current transmission (if one is in progress).
366 User_Input_Task::end_transmission (void *)
370 switch (relay_
->end_transmission (Bounded_Packet_Relay::CANCELLED
))
373 ACE_DEBUG ((LM_DEBUG
,
374 "\nEnd transmission: "
375 "no transmission in progress\n"));
378 // Cancel any remaining timers.
379 this->clear_all_timers ();
387 ACE_ERROR_RETURN ((LM_ERROR
,
388 "User_Input_Task::end_transmission: "
389 "relay not instantiated"),
393 // Reports statistics for the previous transmission
394 // (if one is not in progress).
397 User_Input_Task::report_stats (void *)
401 switch (relay_
->report_statistics ())
404 ACE_DEBUG ((LM_DEBUG
,
405 "\nRun transmission: "
406 "\ntransmission already in progress\n"));
411 this->clear_all_timers ();
420 ACE_ERROR_RETURN ((LM_ERROR
,
421 "User_Input_Task::report_stats: "
422 "relay not instantiated"),
426 // Shut down the task.
429 User_Input_Task::shutdown (void *)
431 // Clear any outstanding timers.
432 this->clear_all_timers ();
434 #if !defined (ACE_LACKS_PTHREAD_CANCEL)
435 // Cancel the thread timer queue task "preemptively."
436 ACE_Thread::cancel (this->queue_
->thr_id ());
438 // Cancel the thread timer queue task "voluntarily."
439 this->queue_
->deactivate ();
440 #endif /* ACE_LACKS_PTHREAD_CANCEL */
442 // -1 indicates we are shutting down the application.
446 // Helper method: clears all timers.
449 User_Input_Task::clear_all_timers ()
451 // loop through the timers in the queue, cancelling each one
452 for (ACE_Timer_Node_T
<ACE_Event_Handler
*> *node
;
453 (node
= queue_
->timer_queue ()->get_first ()) != 0;
455 queue_
->timer_queue ()->cancel (node
->get_timer_id (), 0, 0);
462 BPR_Handler_Base::BPR_Handler_Base (Bounded_Packet_Relay
&relay
,
463 Thread_Timer_Queue
&queue
)
471 BPR_Handler_Base::~BPR_Handler_Base ()
475 // Helper method: clears all timers.
478 BPR_Handler_Base::clear_all_timers (void *)
480 // Loop through the timers in the queue, cancelling each one.
482 for (ACE_Timer_Node_T
<ACE_Event_Handler
*> *node
;
483 (node
= queue_
.timer_queue ()->get_first ()) != 0;
485 queue_
.timer_queue ()->cancel (node
->get_timer_id (), 0, 0);
486 // queue_.cancel (node->get_timer_id (), 0);
488 // Invoke the handler's (virtual) destructor
496 Send_Handler::Send_Handler (u_long send_count
,
497 const ACE_Time_Value
&duration
,
498 Bounded_Packet_Relay
&relay
,
499 Thread_Timer_Queue
&queue
,
500 Thread_Bounded_Packet_Relay_Driver
&driver
)
501 : BPR_Handler_Base (relay
, queue
),
502 send_count_ (send_count
),
503 duration_ (duration
),
510 Send_Handler::~Send_Handler ()
517 Send_Handler::handle_timeout (const ACE_Time_Value
&,
520 switch (relay_
.send_input ())
523 // Decrement count of packets to relay.
529 // Enqueue a deferred callback to the reregister command.
530 SEND_HANDLER_CMD
*re_register_callback_
;
531 ACE_NEW_RETURN (re_register_callback_
,
532 SEND_HANDLER_CMD (*this,
533 &Send_Handler::reregister
),
535 return queue_
.enqueue_command (re_register_callback_
);
539 // All packets are sent, time to end the transmission, redisplay
540 // the user menu, cancel any other timers, and go away.
541 relay_
.end_transmission (Bounded_Packet_Relay::COMPLETED
);
542 driver_
.display_menu ();
544 // Enqueue a deferred callback to the clear_all_timers command.
545 HANDLER_CMD
*clear_timers_callback_
;
546 ACE_NEW_RETURN (clear_timers_callback_
,
548 &BPR_Handler_Base::clear_all_timers
),
550 return queue_
.enqueue_command (clear_timers_callback_
);
558 // Cancellation hook.
561 Send_Handler::cancelled ()
567 // Helper method: re-registers this timer
570 Send_Handler::reregister (void *)
572 // Re-register the handler for a new timeout.
573 if (queue_
.schedule (this,
575 duration_
+ ACE_OS::gettimeofday ()) < 0)
576 ACE_ERROR_RETURN ((LM_ERROR
,
577 "Send_Handler::reregister: "
578 "failed to reschedule send handler"),
587 Termination_Handler::Termination_Handler (Bounded_Packet_Relay
&relay
,
588 Thread_Timer_Queue
&queue
,
589 Thread_Bounded_Packet_Relay_Driver
&driver
)
590 : BPR_Handler_Base (relay
, queue
),
597 Termination_Handler::~Termination_Handler ()
604 Termination_Handler::handle_timeout (const ACE_Time_Value
&,
607 // Transmission timed out, so end the transmission, display the user
608 // menu, and register a callback to clear the timer queue and then
609 // make this object go away.
610 relay_
.end_transmission (Bounded_Packet_Relay::TIMED_OUT
);
611 driver_
.display_menu ();
613 // Enqueue a deferred callback to the clear_all_timers command.
614 HANDLER_CMD
*clear_timers_callback_
;
615 ACE_NEW_RETURN (clear_timers_callback_
,
617 &BPR_Handler_Base::clear_all_timers
),
619 return queue_
.enqueue_command (clear_timers_callback_
);
625 Termination_Handler::cancelled ()
633 Thread_Bounded_Packet_Relay_Driver::Thread_Bounded_Packet_Relay_Driver (Bounded_Packet_Relay
*relay
)
634 : input_task_ (relay
, &timer_queue_
, *this)
640 Thread_Bounded_Packet_Relay_Driver::~Thread_Bounded_Packet_Relay_Driver ()
644 // Display the user menu.
647 Thread_Bounded_Packet_Relay_Driver::display_menu ()
651 " ----------------------------------------------------------------------\n"
652 " 1 <number of packets to relay in one transmission = %d>\n"
654 " 2 <input packet arrival period (in usec) = %d>\n"
656 " 3 <output packet transmission period (in usec) = %d>\n"
658 " 4 <limit on duration of transmission (in usec) = %d>\n"
659 " min = 1, no limit = 0.\n"
660 " 5 <logging level flags = %d>\n"
662 " log packets created by input device = 1,\n"
663 " log packets consumed by output device = 2,\n"
664 " logging options 1,2 = 3,\n"
665 " print contents of packets consumed by output put device = 4,\n"
666 " logging options 1,4 = 5,\n"
667 " logging options 2,4 = 6,\n"
668 " logging options 1,2,4 = 7.\n"
669 " ----------------------------------------------------------------------\n"
670 " 6 - runs a transmission using the current settings\n"
671 " 7 - cancels a transmission (if there is one running)\n"
672 " 8 - reports statistics from the most recent transmission\n"
673 " 9 - quits the program\n"
674 " ----------------------------------------------------------------------\n"
675 " Please enter your choice: ";
677 ACE_DEBUG ((LM_DEBUG
,
679 this->packet_count (),
680 this->arrival_period (),
681 this->send_period (),
682 this->duration_limit (),
683 this->logging_level ()));
688 // Initialize the driver.
691 Thread_Bounded_Packet_Relay_Driver::init ()
693 // Initialize the <Command> objects with their corresponding
694 // methods from <User_Input_Task>.
695 ACE_NEW_RETURN (packet_count_cmd_
,
696 DRIVER_CMD (input_task_
,
697 &User_Input_Task::set_packet_count
),
699 ACE_NEW_RETURN (arrival_period_cmd_
,
700 DRIVER_CMD (input_task_
,
701 &User_Input_Task::set_arrival_period
),
703 ACE_NEW_RETURN (transmit_period_cmd_
,
704 DRIVER_CMD (input_task_
,
705 &User_Input_Task::set_send_period
),
707 ACE_NEW_RETURN (duration_limit_cmd_
,
708 DRIVER_CMD (input_task_
,
709 &User_Input_Task::set_duration_limit
),
711 ACE_NEW_RETURN (logging_level_cmd_
,
712 DRIVER_CMD (input_task_
,
713 &User_Input_Task::set_logging_level
),
715 ACE_NEW_RETURN (run_transmission_cmd_
,
716 DRIVER_CMD (input_task_
,
717 &User_Input_Task::run_transmission
),
719 ACE_NEW_RETURN (cancel_transmission_cmd_
,
720 DRIVER_CMD (input_task_
,
721 &User_Input_Task::end_transmission
),
723 ACE_NEW_RETURN (report_stats_cmd_
,
724 DRIVER_CMD (input_task_
,
725 &User_Input_Task::report_stats
),
727 ACE_NEW_RETURN (shutdown_cmd_
,
728 DRIVER_CMD (input_task_
,
729 &User_Input_Task::shutdown
),
731 if (this->input_task_
.activate () == -1)
732 ACE_ERROR_RETURN ((LM_ERROR
,
733 "cannot activate input task"),
735 else if (this->timer_queue_
.activate () == -1)
736 ACE_ERROR_RETURN ((LM_ERROR
,
737 "cannot activate timer queue"),
739 else if (ACE_Thread_Manager::instance ()->wait () == -1)
740 ACE_ERROR_RETURN ((LM_ERROR
,
741 "wait on Thread_Manager failed"),
749 Thread_Bounded_Packet_Relay_Driver::run ()