Merge pull request #2216 from jwillemsen/jwi-cxxversionchecks
[ACE_TAO.git] / ACE / examples / Bounded_Packet_Relay / BPR_Drivers.cpp
blob1c19a83e62e06bf4a7532de9439d820015412d7f
2 //=============================================================================
3 /**
4 * @file BPR_Drivers.cpp
6 * This code builds an abstraction to factor out common code for
7 * the different implementations of the Timer_Queue.
9 * @author Chris Gill <cdgill@cs.wustl.edu> and Douglas C. Schmidt <d.schmidt@vanderbilt.edu> Based on the Timer Queue Test example written by Carlos O'Ryan <coryan@cs.wustl.edu> and Douglas C. Schmidt <d.schmidt@vanderbilt.edu> and Sergio Flores-Gaitan <sergio@cs.wustl.edu>
11 //=============================================================================
14 #include "ace/OS_NS_sys_time.h"
15 #include "BPR_Drivers.h"
18 // Constructor.
20 Input_Device_Wrapper_Base::Input_Device_Wrapper_Base (ACE_Thread_Manager *input_task_mgr)
21 : ACE_Task_Base (input_task_mgr),
22 send_input_msg_cmd_ (0),
23 input_period_ (ACE_ONE_SECOND_IN_USECS),
24 is_active_ (0),
25 send_count_ (0)
29 // Destructor.
31 Input_Device_Wrapper_Base::~Input_Device_Wrapper_Base ()
35 // Sets send input message command in the input device driver object.
37 int
38 Input_Device_Wrapper_Base::set_send_input_msg_cmd (ACE_Command_Base *send_input_msg_cmd)
40 // Set the new command. Input device is not responsible
41 // for deleting the old command, if any.
42 send_input_msg_cmd_ = send_input_msg_cmd;
43 return 0;
46 // Sets period between when input messages are produced.
48 int
49 Input_Device_Wrapper_Base::set_input_period (u_long input_period)
51 input_period_ = input_period;
52 return 0;
55 // Sets count of messages to send.
57 int
58 Input_Device_Wrapper_Base::set_send_count (long count)
60 send_count_ = count;
61 return 0;
64 // Request that the input device stop sending messages
65 // and terminate its thread. Should return 1 if it will do so, 0
66 // if it has already done so, or -1 if there is a problem doing so.
68 int
69 Input_Device_Wrapper_Base::request_stop ()
71 if (is_active_)
73 is_active_ = 0;
74 return 1;
77 return 0;
80 // This method runs the input device loop in the new thread.
82 int
83 Input_Device_Wrapper_Base::svc ()
85 ACE_Time_Value timeout;
86 ACE_Message_Block *message;
88 // Set a flag to indicate we're active.
89 is_active_ = 1;
91 // Start with the total count of messages to send.
92 for (current_count_ = send_count_;
93 // While we're still marked active, and there are packets to send.
94 (is_active_) && (current_count_ != 0);
97 // Create an input message to send.
98 message = create_input_message ();
99 if (message == 0)
101 if (is_active_)
103 is_active_ = 0;
104 ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n",
105 "Failed to create input message object"),
106 -1);
109 break;
112 // Make sure there is a send command object.
113 if (send_input_msg_cmd_ == 0)
115 delete message;
116 if (is_active_)
118 is_active_ = 0;
119 ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n",
120 "send message command object not instantiated"),
121 -1);
124 break;
127 // Send the input message.
128 if (send_input_msg_cmd_->execute ((void *) message) < 0)
130 delete message;
131 if (is_active_)
133 is_active_ = 0;
134 ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n",
135 "Failed executing send message command object"),
136 -1);
139 break;
142 // If all went well, decrement count of messages to send, and
143 // run the reactor event loop unti we get a timeout or something
144 // happens in a registered upcall.
145 if (current_count_ > 0)
146 --current_count_;
148 timeout = ACE_Time_Value (0, input_period_);
149 reactor_.run_event_loop (timeout);
152 is_active_ = 0;
154 return 0;
157 // Sends a newly created message block, carrying data read from the
158 // underlying input device, by passing a pointer to the message block
159 // to its command execution.
162 Input_Device_Wrapper_Base::send_input_message (ACE_Message_Block *amb)
164 if (send_input_msg_cmd_)
165 return send_input_msg_cmd_->execute ((void *) amb);
166 else
168 if (is_active_)
169 ACE_ERROR ((LM_ERROR, "%t %p\n",
170 "Input_Device_Wrapper_Base::send_input_message: "
171 "command object not instantiated"));
173 return -1;
177 Output_Device_Wrapper_Base::~Output_Device_Wrapper_Base ()
181 // Constructor.
183 Bounded_Packet_Relay::Bounded_Packet_Relay (ACE_Thread_Manager *input_task_mgr,
184 Input_Device_Wrapper_Base *input_wrapper,
185 Output_Device_Wrapper_Base *output_wrapper)
186 : is_active_ (0),
187 input_task_mgr_ (input_task_mgr),
188 input_wrapper_ (input_wrapper),
189 output_wrapper_ (output_wrapper),
190 queue_ (Bounded_Packet_Relay::DEFAULT_HWM,
191 Bounded_Packet_Relay::DEFAULT_LWM),
192 queue_hwm_ (Bounded_Packet_Relay::DEFAULT_HWM),
193 queue_lwm_ (Bounded_Packet_Relay::DEFAULT_LWM),
194 transmission_number_ (0),
195 packets_sent_ (0),
196 status_ (Bounded_Packet_Relay::UN_INITIALIZED),
197 transmission_start_ (ACE_Time_Value::zero),
198 transmission_end_ (ACE_Time_Value::zero)
200 if (input_task_mgr_ == 0)
201 input_task_mgr_ = ACE_Thread_Manager::instance ();
204 // Destructor.
206 Bounded_Packet_Relay::~Bounded_Packet_Relay ()
208 // Reactivate the queue, and then clear it.
209 queue_.activate ();
210 while (! queue_.is_empty ())
212 ACE_Message_Block *msg;
213 queue_.dequeue_head (msg);
214 delete msg;
219 // Requests output be sent to output device.
222 Bounded_Packet_Relay::send_input ()
224 // Don't block, return immediately if queue is empty.
225 ACE_Message_Block *item;
227 // Using a separate (non-const) time value
228 // is necessary on some platforms
229 ACE_Time_Value immediate (ACE_Time_Value::zero);
231 if (queue_.dequeue_head (item,
232 &immediate) < 0)
233 return 1;
235 // If a message block was dequeued, send it to the output device.
237 if (output_wrapper_->write_output_message ((void *) item) < 0)
239 if (is_active_)
240 ACE_ERROR ((LM_ERROR,
241 "%t %p\n",
242 "failed to write to output device object"));
244 return -1;
247 // If all went OK, increase count of packets sent.
248 ++packets_sent_;
249 return 0;
252 // Requests a transmission be started.
255 Bounded_Packet_Relay::start_transmission (u_long packet_count,
256 u_long arrival_period,
257 int logging_level)
259 // Serialize access to start and end transmission calls, statistics
260 // reporting calls.
261 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->transmission_lock_, -1);
263 // If a transmission is already in progress, just return.
264 if (is_active_)
265 return 1;
267 // Set transmission in progress flag true.
268 is_active_ = 1;
270 // Update statistics for a new transmission.
271 ++transmission_number_;
272 packets_sent_ = 0;
273 status_ = STARTED;
274 transmission_start_ = ACE_OS::gettimeofday ();
276 // Reactivate the queue, and then clear it.
277 queue_.activate ();
278 while (! queue_.is_empty ())
280 ACE_Message_Block *msg;
281 queue_.dequeue_head (msg);
282 delete msg;
285 // Initialize the output device.
286 if (output_wrapper_->modify_device_settings ((void *) &logging_level) < 0)
288 status_ = ERROR_DETECTED;
289 transmission_end_ = ACE_OS::gettimeofday ();
290 is_active_ = 0;
291 ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n",
292 "failed to initialize output device object"),
293 -1);
295 // Initialize the input device.
296 if (input_wrapper_->modify_device_settings ((void *) &logging_level) < 0)
298 status_ = ERROR_DETECTED;
299 transmission_end_ = ACE_OS::gettimeofday ();
300 is_active_ = 0;
301 ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n",
302 "failed to initialize output device object"),
303 -1);
305 else if (input_wrapper_->set_input_period (arrival_period) < 0)
307 status_ = ERROR_DETECTED;
308 transmission_end_ = ACE_OS::gettimeofday ();
309 is_active_ = 0;
310 ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n",
311 "failed to initialize input device object"),
312 -1);
314 else if (input_wrapper_->set_send_count (packet_count) < 0)
316 status_ = ERROR_DETECTED;
317 transmission_end_ = ACE_OS::gettimeofday ();
318 is_active_ = 0;
319 ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n",
320 "failed to initialize input device object"),
321 -1);
323 // Activate the input device.
324 else if (input_wrapper_->activate () < 0)
326 status_ = ERROR_DETECTED;
327 transmission_end_ = ACE_OS::gettimeofday ();
328 is_active_ = 0;
329 ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n",
330 "failed to activate input device object"),
331 -1);
334 // If all went well, print a startup message and return success.
335 ACE_DEBUG ((LM_DEBUG,
336 "\n\nTransmission %u started\n\n",
337 transmission_number_));
338 return 0;
341 // Requests a transmission be ended.
344 Bounded_Packet_Relay::end_transmission (Transmission_Status status)
346 // Serialize access to start and end transmission calls,
347 // statistics reporting calls.
348 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->transmission_lock_, -1);
350 // If a transmission is not already in progress, just return.
351 if (! is_active_)
352 return 1;
354 // Set transmission in progress flag false.
355 is_active_ = 0;
357 // Ask the the input thread to stop.
358 if (input_wrapper_->request_stop () < 0)
360 status_ = ERROR_DETECTED;
361 transmission_end_ = ACE_OS::gettimeofday ();
362 ACE_ERROR_RETURN ((LM_ERROR, "%t %p\n",
363 "failed asking input device thread to stop"),
364 -1);
367 // Deactivate the queue, allowing all waiting threads to continue.
368 queue_.deactivate ();
370 // Wait for input thread to stop.
371 input_task_mgr_->wait_task (input_wrapper_);
373 // Reactivate the queue, and then clear it.
374 queue_.activate ();
375 while (! queue_.is_empty ())
377 ACE_Message_Block *msg;
378 queue_.dequeue_head (msg);
379 delete msg;
382 // If all went well, set passed status, stamp end time, print a
383 // termination message, and return success.
384 status_ = status;
385 transmission_end_ = ACE_OS::gettimeofday ();
386 ACE_DEBUG ((LM_DEBUG,
387 "\n\nTransmission %u ended with status: %s\n\n",
388 transmission_number_, status_msg ()));
389 return 0;
392 // Requests a report of statistics from the last transmission.
395 Bounded_Packet_Relay::report_statistics ()
397 // Serialize access to start and end transmission calls,
398 // statistics reporting calls.
399 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->transmission_lock_, -1);
401 // If a transmission is already in progress, just return.
402 if (is_active_)
403 return 1;
405 // Calculate duration of trasmission.
406 ACE_Time_Value duration (transmission_end_);
407 duration -= transmission_start_;
409 // Report transmission statistics.
410 ACE_DEBUG ((LM_DEBUG,
411 "\n\nStatisics for transmission %u:\n\n"
412 "Transmission status: %s\n"
413 "Start time: %d (sec) %d (usec)\n"
414 "End time: %d (sec) %d (usec)\n"
415 "Duration: %d (sec) %d (usec)\n"
416 "Packets relayed: %u\n\n",
417 transmission_number_, status_msg (),
418 transmission_start_.sec (),
419 transmission_start_.usec (),
420 transmission_end_.sec (),
421 transmission_end_.usec (),
422 duration.sec (),
423 duration.usec (),
424 packets_sent_));
425 return 0;
428 // Public entry point to which to push input.
431 Bounded_Packet_Relay::receive_input (void * arg)
433 if (! arg)
435 if (is_active_)
436 ACE_ERROR ((LM_ERROR, "%t %p\n",
437 "Bounded_Packet_Relay::receive_input: "
438 "null argument"));
440 return -1;
442 ACE_Message_Block *message = static_cast<ACE_Message_Block *> (arg);
443 if (queue_.enqueue_tail (message) < 0)
445 if (is_active_)
446 ACE_ERROR ((LM_ERROR, "%t %p\n",
447 "Bounded_Packet_Relay::receive_input failed"));
449 return -1;
452 return 0;
455 // Get high water mark for relay queue.
457 ACE_UINT32
458 Bounded_Packet_Relay::queue_hwm ()
460 return queue_lwm_;
464 // Set high water mark for relay queue.
466 void
467 Bounded_Packet_Relay::queue_hwm (ACE_UINT32 hwm)
469 queue_hwm_ = hwm;
472 // Get low water mark for relay queue.
474 ACE_UINT32
475 Bounded_Packet_Relay::queue_lwm ()
477 return queue_lwm_;
480 // Set low water mark for relay queue.
482 void
483 Bounded_Packet_Relay::queue_lwm (ACE_UINT32 lwm)
485 queue_lwm_ = lwm;
489 // Returns string corresponding to current status.
491 const char *
492 Bounded_Packet_Relay::status_msg ()
494 const char *status_msg;
495 switch (status_)
497 case UN_INITIALIZED:
498 status_msg = "uninitialized";
499 break;
500 case STARTED:
501 status_msg = "in progress";
502 break;
503 case COMPLETED:
504 status_msg = "completed with all packets sent";
505 break;
506 case TIMED_OUT:
507 status_msg = "terminated by transmission duration timer";
508 break;
509 case CANCELLED:
510 status_msg = "cancelled by external control";
511 break;
512 case ERROR_DETECTED:
513 status_msg = "error was detected";
514 break;
515 default:
516 status_msg = "unknown transmission status";
517 break;
520 return status_msg;