2 //=============================================================================
4 * @file BPR_Drivers.cpp
6 * This code builds an abstraction to factor out common code for
7 * the different implementations of the Timer_Queue.
9 * @author Chris Gill <cdgill@cs.wustl.edu> and Douglas C. Schmidt <d.schmidt@vanderbilt.edu> Based on the Timer Queue Test example written by Carlos O'Ryan <coryan@cs.wustl.edu> and Douglas C. Schmidt <d.schmidt@vanderbilt.edu> and Sergio Flores-Gaitan <sergio@cs.wustl.edu>
11 //=============================================================================
14 #include "ace/OS_NS_sys_time.h"
15 #include "BPR_Drivers.h"
20 Input_Device_Wrapper_Base::Input_Device_Wrapper_Base (ACE_Thread_Manager
*input_task_mgr
)
21 : ACE_Task_Base (input_task_mgr
),
22 send_input_msg_cmd_ (0),
23 input_period_ (ACE_ONE_SECOND_IN_USECS
),
31 Input_Device_Wrapper_Base::~Input_Device_Wrapper_Base ()
35 // Sets send input message command in the input device driver object.
38 Input_Device_Wrapper_Base::set_send_input_msg_cmd (ACE_Command_Base
*send_input_msg_cmd
)
40 // Set the new command. Input device is not responsible
41 // for deleting the old command, if any.
42 send_input_msg_cmd_
= send_input_msg_cmd
;
46 // Sets period between when input messages are produced.
49 Input_Device_Wrapper_Base::set_input_period (u_long input_period
)
51 input_period_
= input_period
;
55 // Sets count of messages to send.
58 Input_Device_Wrapper_Base::set_send_count (long count
)
64 // Request that the input device stop sending messages
65 // and terminate its thread. Should return 1 if it will do so, 0
66 // if it has already done so, or -1 if there is a problem doing so.
69 Input_Device_Wrapper_Base::request_stop ()
80 // This method runs the input device loop in the new thread.
83 Input_Device_Wrapper_Base::svc ()
85 ACE_Time_Value timeout
;
86 ACE_Message_Block
*message
;
88 // Set a flag to indicate we're active.
91 // Start with the total count of messages to send.
92 for (current_count_
= send_count_
;
93 // While we're still marked active, and there are packets to send.
94 (is_active_
) && (current_count_
!= 0);
97 // Create an input message to send.
98 message
= create_input_message ();
104 ACE_ERROR_RETURN ((LM_ERROR
, "%t %p\n",
105 "Failed to create input message object"),
112 // Make sure there is a send command object.
113 if (send_input_msg_cmd_
== 0)
119 ACE_ERROR_RETURN ((LM_ERROR
, "%t %p\n",
120 "send message command object not instantiated"),
127 // Send the input message.
128 if (send_input_msg_cmd_
->execute ((void *) message
) < 0)
134 ACE_ERROR_RETURN ((LM_ERROR
, "%t %p\n",
135 "Failed executing send message command object"),
142 // If all went well, decrement count of messages to send, and
143 // run the reactor event loop unti we get a timeout or something
144 // happens in a registered upcall.
145 if (current_count_
> 0)
148 timeout
= ACE_Time_Value (0, input_period_
);
149 reactor_
.run_event_loop (timeout
);
157 // Sends a newly created message block, carrying data read from the
158 // underlying input device, by passing a pointer to the message block
159 // to its command execution.
162 Input_Device_Wrapper_Base::send_input_message (ACE_Message_Block
*amb
)
164 if (send_input_msg_cmd_
)
165 return send_input_msg_cmd_
->execute ((void *) amb
);
169 ACE_ERROR ((LM_ERROR
, "%t %p\n",
170 "Input_Device_Wrapper_Base::send_input_message: "
171 "command object not instantiated"));
177 Output_Device_Wrapper_Base::~Output_Device_Wrapper_Base ()
183 Bounded_Packet_Relay::Bounded_Packet_Relay (ACE_Thread_Manager
*input_task_mgr
,
184 Input_Device_Wrapper_Base
*input_wrapper
,
185 Output_Device_Wrapper_Base
*output_wrapper
)
187 input_task_mgr_ (input_task_mgr
),
188 input_wrapper_ (input_wrapper
),
189 output_wrapper_ (output_wrapper
),
190 queue_ (Bounded_Packet_Relay::DEFAULT_HWM
,
191 Bounded_Packet_Relay::DEFAULT_LWM
),
192 queue_hwm_ (Bounded_Packet_Relay::DEFAULT_HWM
),
193 queue_lwm_ (Bounded_Packet_Relay::DEFAULT_LWM
),
194 transmission_number_ (0),
196 status_ (Bounded_Packet_Relay::UN_INITIALIZED
),
197 transmission_start_ (ACE_Time_Value::zero
),
198 transmission_end_ (ACE_Time_Value::zero
)
200 if (input_task_mgr_
== 0)
201 input_task_mgr_
= ACE_Thread_Manager::instance ();
206 Bounded_Packet_Relay::~Bounded_Packet_Relay ()
208 // Reactivate the queue, and then clear it.
210 while (! queue_
.is_empty ())
212 ACE_Message_Block
*msg
;
213 queue_
.dequeue_head (msg
);
219 // Requests output be sent to output device.
222 Bounded_Packet_Relay::send_input ()
224 // Don't block, return immediately if queue is empty.
225 ACE_Message_Block
*item
;
227 // Using a separate (non-const) time value
228 // is necessary on some platforms
229 ACE_Time_Value
immediate (ACE_Time_Value::zero
);
231 if (queue_
.dequeue_head (item
,
235 // If a message block was dequeued, send it to the output device.
237 if (output_wrapper_
->write_output_message ((void *) item
) < 0)
240 ACE_ERROR ((LM_ERROR
,
242 "failed to write to output device object"));
247 // If all went OK, increase count of packets sent.
252 // Requests a transmission be started.
255 Bounded_Packet_Relay::start_transmission (u_long packet_count
,
256 u_long arrival_period
,
259 // Serialize access to start and end transmission calls, statistics
261 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX
, ace_mon
, this->transmission_lock_
, -1);
263 // If a transmission is already in progress, just return.
267 // Set transmission in progress flag true.
270 // Update statistics for a new transmission.
271 ++transmission_number_
;
274 transmission_start_
= ACE_OS::gettimeofday ();
276 // Reactivate the queue, and then clear it.
278 while (! queue_
.is_empty ())
280 ACE_Message_Block
*msg
;
281 queue_
.dequeue_head (msg
);
285 // Initialize the output device.
286 if (output_wrapper_
->modify_device_settings ((void *) &logging_level
) < 0)
288 status_
= ERROR_DETECTED
;
289 transmission_end_
= ACE_OS::gettimeofday ();
291 ACE_ERROR_RETURN ((LM_ERROR
, "%t %p\n",
292 "failed to initialize output device object"),
295 // Initialize the input device.
296 if (input_wrapper_
->modify_device_settings ((void *) &logging_level
) < 0)
298 status_
= ERROR_DETECTED
;
299 transmission_end_
= ACE_OS::gettimeofday ();
301 ACE_ERROR_RETURN ((LM_ERROR
, "%t %p\n",
302 "failed to initialize output device object"),
305 else if (input_wrapper_
->set_input_period (arrival_period
) < 0)
307 status_
= ERROR_DETECTED
;
308 transmission_end_
= ACE_OS::gettimeofday ();
310 ACE_ERROR_RETURN ((LM_ERROR
, "%t %p\n",
311 "failed to initialize input device object"),
314 else if (input_wrapper_
->set_send_count (packet_count
) < 0)
316 status_
= ERROR_DETECTED
;
317 transmission_end_
= ACE_OS::gettimeofday ();
319 ACE_ERROR_RETURN ((LM_ERROR
, "%t %p\n",
320 "failed to initialize input device object"),
323 // Activate the input device.
324 else if (input_wrapper_
->activate () < 0)
326 status_
= ERROR_DETECTED
;
327 transmission_end_
= ACE_OS::gettimeofday ();
329 ACE_ERROR_RETURN ((LM_ERROR
, "%t %p\n",
330 "failed to activate input device object"),
334 // If all went well, print a startup message and return success.
335 ACE_DEBUG ((LM_DEBUG
,
336 "\n\nTransmission %u started\n\n",
337 transmission_number_
));
341 // Requests a transmission be ended.
344 Bounded_Packet_Relay::end_transmission (Transmission_Status status
)
346 // Serialize access to start and end transmission calls,
347 // statistics reporting calls.
348 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX
, ace_mon
, this->transmission_lock_
, -1);
350 // If a transmission is not already in progress, just return.
354 // Set transmission in progress flag false.
357 // Ask the the input thread to stop.
358 if (input_wrapper_
->request_stop () < 0)
360 status_
= ERROR_DETECTED
;
361 transmission_end_
= ACE_OS::gettimeofday ();
362 ACE_ERROR_RETURN ((LM_ERROR
, "%t %p\n",
363 "failed asking input device thread to stop"),
367 // Deactivate the queue, allowing all waiting threads to continue.
368 queue_
.deactivate ();
370 // Wait for input thread to stop.
371 input_task_mgr_
->wait_task (input_wrapper_
);
373 // Reactivate the queue, and then clear it.
375 while (! queue_
.is_empty ())
377 ACE_Message_Block
*msg
;
378 queue_
.dequeue_head (msg
);
382 // If all went well, set passed status, stamp end time, print a
383 // termination message, and return success.
385 transmission_end_
= ACE_OS::gettimeofday ();
386 ACE_DEBUG ((LM_DEBUG
,
387 "\n\nTransmission %u ended with status: %s\n\n",
388 transmission_number_
, status_msg ()));
392 // Requests a report of statistics from the last transmission.
395 Bounded_Packet_Relay::report_statistics ()
397 // Serialize access to start and end transmission calls,
398 // statistics reporting calls.
399 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX
, ace_mon
, this->transmission_lock_
, -1);
401 // If a transmission is already in progress, just return.
405 // Calculate duration of trasmission.
406 ACE_Time_Value
duration (transmission_end_
);
407 duration
-= transmission_start_
;
409 // Report transmission statistics.
410 ACE_DEBUG ((LM_DEBUG
,
411 "\n\nStatisics for transmission %u:\n\n"
412 "Transmission status: %s\n"
413 "Start time: %d (sec) %d (usec)\n"
414 "End time: %d (sec) %d (usec)\n"
415 "Duration: %d (sec) %d (usec)\n"
416 "Packets relayed: %u\n\n",
417 transmission_number_
, status_msg (),
418 transmission_start_
.sec (),
419 transmission_start_
.usec (),
420 transmission_end_
.sec (),
421 transmission_end_
.usec (),
428 // Public entry point to which to push input.
431 Bounded_Packet_Relay::receive_input (void * arg
)
436 ACE_ERROR ((LM_ERROR
, "%t %p\n",
437 "Bounded_Packet_Relay::receive_input: "
442 ACE_Message_Block
*message
= static_cast<ACE_Message_Block
*> (arg
);
443 if (queue_
.enqueue_tail (message
) < 0)
446 ACE_ERROR ((LM_ERROR
, "%t %p\n",
447 "Bounded_Packet_Relay::receive_input failed"));
455 // Get high water mark for relay queue.
458 Bounded_Packet_Relay::queue_hwm ()
464 // Set high water mark for relay queue.
467 Bounded_Packet_Relay::queue_hwm (ACE_UINT32 hwm
)
472 // Get low water mark for relay queue.
475 Bounded_Packet_Relay::queue_lwm ()
480 // Set low water mark for relay queue.
483 Bounded_Packet_Relay::queue_lwm (ACE_UINT32 lwm
)
489 // Returns string corresponding to current status.
492 Bounded_Packet_Relay::status_msg ()
494 const char *status_msg
;
498 status_msg
= "uninitialized";
501 status_msg
= "in progress";
504 status_msg
= "completed with all packets sent";
507 status_msg
= "terminated by transmission duration timer";
510 status_msg
= "cancelled by external control";
513 status_msg
= "error was detected";
516 status_msg
= "unknown transmission status";