1 #include "tao/Transport.h"
3 #include "tao/LF_Follower.h"
4 #include "tao/Leader_Follower.h"
5 #include "tao/Client_Strategy_Factory.h"
6 #include "tao/Wait_Strategy.h"
7 #include "tao/Transport_Mux_Strategy.h"
9 #include "tao/Transport_Queueing_Strategies.h"
10 #include "tao/Connection_Handler.h"
11 #include "tao/GIOP_Message_Base.h"
12 #include "tao/Synch_Queued_Message.h"
13 #include "tao/Asynch_Queued_Message.h"
14 #include "tao/Flushing_Strategy.h"
15 #include "tao/Thread_Lane_Resources.h"
16 #include "tao/Resume_Handle.h"
17 #include "tao/Codeset_Manager.h"
18 #include "tao/Codeset_Translator_Base.h"
19 #include "tao/debug.h"
21 #include "tao/ORB_Core.h"
22 #include "tao/MMAP_Allocator.h"
23 #include "tao/SystemException.h"
24 #include "tao/operation_details.h"
25 #include "tao/Transport_Descriptor_Interface.h"
26 #include "tao/ORB_Time_Policy.h"
28 #include "ace/OS_NS_sys_time.h"
29 #include "ace/OS_NS_stdio.h"
30 #include "ace/Reactor.h"
31 #include "ace/os_include/sys/os_uio.h"
32 #include "ace/High_Res_Timer.h"
33 #include "ace/CORBA_macros.h"
34 #include "ace/Truncate.h"
36 #if !defined (__ACE_INLINE__)
37 # include "tao/Transport.inl"
38 #endif /* __ACE_INLINE__ */
41 * Static function in file scope
44 dump_iov (iovec
*iov
, int iovcnt
, size_t id
,
45 size_t current_transfer
,
46 const ACE_TCHAR
*location
)
48 ACE_GUARD (ACE_Log_Msg
, ace_mon
, *ACE_Log_Msg::instance ());
50 TAOLIB_DEBUG ((LM_DEBUG
,
51 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::%s, ")
52 ACE_TEXT ("sending %d buffers\n"),
53 id
, location
, iovcnt
));
55 for (int i
= 0; i
!= iovcnt
&& 0 < current_transfer
; ++i
)
57 size_t iov_len
= iov
[i
].iov_len
;
59 // Possibly a partially sent iovec entry.
60 if (current_transfer
< iov_len
)
62 iov_len
= current_transfer
;
65 TAOLIB_DEBUG ((LM_DEBUG
,
66 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::%s, ")
67 ACE_TEXT ("buffer %d/%d has %d bytes\n"),
74 for (size_t offset
= 0; offset
< iov_len
; offset
+= len
)
76 ACE_TCHAR header
[1024];
77 ACE_OS::sprintf (header
,
79 ACE_TEXT("Transport[")
80 ACE_SIZE_T_FORMAT_SPECIFIER
83 ACE_SIZE_T_FORMAT_SPECIFIER
ACE_TEXT("/")
84 ACE_SIZE_T_FORMAT_SPECIFIER
ACE_TEXT(")"),
85 id
, location
, offset
, iov_len
);
87 len
= iov_len
- offset
;
94 TAOLIB_HEX_DUMP ((LM_DEBUG
,
95 static_cast<char*> (iov
[i
].iov_base
) + offset
,
99 current_transfer
-= iov_len
;
102 TAOLIB_DEBUG ((LM_DEBUG
,
103 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::%s, ")
104 ACE_TEXT ("end of data\n"),
108 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
110 #if TAO_HAS_TRANSPORT_CURRENT == 1
111 TAO::Transport::Stats::~Stats ()
114 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
116 TAO_Transport::TAO_Transport (CORBA::ULong tag
,
117 TAO_ORB_Core
*orb_core
,
118 size_t input_cdr_size
)
120 , orb_core_ (orb_core
)
121 , cache_map_entry_ (nullptr)
124 , bidirectional_flag_ (-1)
125 , opening_connection_role_ (TAO::TAO_UNSPECIFIED_ROLE
)
128 , incoming_message_queue_ (orb_core
)
129 , current_deadline_ (ACE_Time_Value::zero
)
130 , flush_timer_id_ (-1)
131 , transport_timer_ (this)
132 , handler_lock_ (orb_core
->resource_factory ()->create_cached_connection_lock ())
133 , id_ ((size_t) this)
135 , recv_buffer_size_ (0)
136 , sent_byte_count_ (0)
137 , is_connected_ (false)
138 , connection_closed_on_read_ (false)
139 , messaging_object_ (nullptr)
140 , char_translator_ (nullptr)
141 , wchar_translator_ (nullptr)
143 , first_request_ (true)
144 , partial_message_ (nullptr)
145 #if TAO_HAS_SENDFILE == 1
146 // The ORB has been configured to use the MMAP allocator, meaning
147 // we could/should use sendfile() to send data. Cast once rather
148 // here rather than during each send. This assumes that all
149 // TAO_OutputCDR instances are using the same TAO_MMAP_Allocator
150 // instance as the underlying output CDR buffer allocator.
152 dynamic_cast<TAO_MMAP_Allocator
*> (
153 orb_core
->output_cdr_buffer_allocator ()))
154 #endif /* TAO_HAS_SENDFILE==1 */
155 #if TAO_HAS_TRANSPORT_CURRENT == 1
157 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
158 , flush_in_post_open_ (false)
160 ACE_NEW (this->messaging_object_
,
161 TAO_GIOP_Message_Base (orb_core
,
165 TAO_Client_Strategy_Factory
*cf
=
166 this->orb_core_
->client_factory ();
169 this->ws_
= cf
->create_wait_strategy (this);
172 this->tms_
= cf
->create_transport_mux_strategy (this);
174 #if TAO_HAS_TRANSPORT_CURRENT == 1
176 ACE_NEW_THROW_EX (this->stats_
,
177 TAO::Transport::Stats
,
178 CORBA::NO_MEMORY ());
179 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
182 TAO_Transport::~TAO_Transport ()
184 if (TAO_debug_level
> 9)
186 TAOLIB_DEBUG ((LM_DEBUG
, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::~Transport\n"),
190 delete this->messaging_object_
;
196 delete this->handler_lock_
;
198 if (!this->is_connected_
)
200 // When we have a not connected transport we could have buffered
201 // messages on this transport which we have to cleanup now.
202 this->cleanup_queue_i();
205 // Release the partial message block, however we may
206 // have never allocated one.
207 ACE_Message_Block::release (this->partial_message_
);
209 // By the time the destructor is reached here all the connection stuff
210 // *must* have been cleaned up.
212 // The following assert is needed for the test "Bug_2494_Regression".
213 // See the bugzilla bug #2494 for details.
214 ACE_ASSERT (this->queue_is_empty_i ());
215 ACE_ASSERT (this->cache_map_entry_
== nullptr);
217 #if TAO_HAS_TRANSPORT_CURRENT == 1
219 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
223 TAO_Transport::provide_handler (TAO::Connection_Handler_Set
&handlers
)
225 (void) this->add_reference ();
227 handlers
.insert (this->connection_handler_i ());
231 TAO_Transport::provide_blockable_handler (TAO::Connection_Handler_Set
&h
)
233 if (this->ws_
->non_blocking () ||
234 this->opening_connection_role_
== TAO::TAO_SERVER_ROLE
)
237 (void) this->add_reference ();
239 h
.insert (this->connection_handler_i ());
245 TAO_Transport::idle_after_send ()
247 return this->tms ()->idle_after_send ();
251 TAO_Transport::idle_after_reply ()
253 return this->tms ()->idle_after_reply ();
257 TAO_Transport::tear_listen_point_list (TAO_InputCDR
&)
259 ACE_NOTSUP_RETURN (-1);
263 TAO_Transport::send_message_shared (TAO_Stub
*stub
,
264 TAO_Message_Semantics message_semantics
,
265 const ACE_Message_Block
*message_block
,
266 ACE_Time_Value
*max_wait_time
)
271 ACE_GUARD_RETURN (ACE_Lock
, ace_mon
, *this->handler_lock_
, -1);
274 this->send_message_shared_i (stub
, message_semantics
,
275 message_block
, max_wait_time
);
280 // The connection needs to be closed here.
281 // In the case of a partially written message this is the only way to cleanup
282 // the physical connection as well as the Transport. An EOF on the remote end
283 // will cancel the partially received message.
284 this->close_connection ();
291 TAO_Transport::post_connect_hook ()
297 TAO_Transport::register_if_necessary ()
299 if (this->is_connected_
&&
300 this->wait_strategy ()->register_handler () == -1)
302 // Registration failures.
303 if (TAO_debug_level
> 0)
305 TAOLIB_ERROR ((LM_ERROR
,
306 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::register_if_necessary, ")
307 ACE_TEXT ("could not register the transport ")
308 ACE_TEXT ("in the reactor.\n"),
312 // Purge from the connection cache, if we are not in the cache, this
313 // just does nothing.
314 (void) this->purge_entry ();
316 // Close the handler.
317 (void) this->close_connection ();
325 TAO_Transport::close_connection ()
327 this->connection_handler_i ()->close_connection ();
331 TAO_Transport::register_handler ()
333 if (TAO_debug_level
> 4)
335 TAOLIB_DEBUG ((LM_DEBUG
,
336 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::register_handler\n"),
340 ACE_Reactor
* const r
= this->orb_core_
->reactor ();
342 // @@note: This should be okay since the register handler call will
343 // not make a nested call into the transport.
344 ACE_GUARD_RETURN (ACE_Lock
,
346 *this->handler_lock_
,
349 if (r
== this->event_handler_i ()->reactor () &&
350 (this->wait_strategy ()->non_blocking () ||
351 !this->orb_core ()->client_factory ()->use_cleanup_options ()))
353 if (TAO_debug_level
> 6)
355 TAOLIB_DEBUG ((LM_DEBUG
,
356 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::register_handler - ")
357 ACE_TEXT ("already registered with reactor\n"),
364 if (TAO_debug_level
> 6)
366 TAOLIB_DEBUG ((LM_DEBUG
,
367 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::register_handler - ")
368 ACE_TEXT ("registering event handler with reactor\n"),
372 // Set the flag in the Connection Handler and in the Wait Strategy
373 // @@Maybe we should set these flags after registering with the
374 // reactor. What if the registration fails???
375 this->ws_
->is_registered (true);
377 // Register the handler with the reactor
378 return r
->register_handler (this->event_handler_i (),
379 ACE_Event_Handler::READ_MASK
);
383 TAO_Transport::remove_handler ()
385 if (TAO_debug_level
> 4)
387 TAOLIB_DEBUG ((LM_DEBUG
,
388 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::remove_handler\n"),
392 ACE_Reactor
* const r
= this->orb_core_
->reactor ();
394 // @@note: This should be okay since the remove handler call will
395 // not make a nested call into the transport.
396 ACE_GUARD_RETURN (ACE_Lock
,
398 *this->handler_lock_
,
402 if (this->event_handler_i ()->reactor () == nullptr)
407 if (TAO_debug_level
> 6)
409 TAOLIB_DEBUG ((LM_DEBUG
,
410 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::remove_handler - ")
411 ACE_TEXT ("removing event handler from reactor\n"),
415 // Set the flag in the Wait Strategy
416 this->ws_
->is_registered (false);
418 // Remove the handler from the reactor
419 if (r
->remove_handler (this->event_handler_i (),
420 ACE_Event_Handler::READ_MASK
|
421 ACE_Event_Handler::DONT_CALL
) == -1)
423 if (TAO_debug_level
> 0)
424 TAOLIB_ERROR ((LM_ERROR
,
425 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::remove_handler - ")
426 ACE_TEXT ("reactor->remove_handler failed\n"),
432 // reset the reactor property of the event handler or
433 // Transport::register_handler() will not re-register
434 // when called after us again.
435 this->event_handler_i ()->reactor (nullptr);
440 #if TAO_HAS_SENDFILE == 1
442 TAO_Transport::sendfile (TAO_MMAP_Allocator
* /* allocator */,
445 size_t &bytes_transferred
,
446 TAO::Transport::Drain_Constraints
const & dc
)
448 // Concrete pluggable transport doesn't implement sendfile().
449 // Fallback on TAO_Transport::send().
451 // @@ We can probably refactor the TAO_IIOP_Transport::sendfile()
452 // implementation to this base class method, and leave any TCP
453 // specific configuration out of this base class method.
455 return this->send (iov
, iovcnt
, bytes_transferred
,
456 this->io_timeout (dc
));
458 #endif /* TAO_HAS_SENDFILE==1 */
461 TAO_Transport::generate_locate_request (
462 TAO_Target_Specification
&spec
,
463 TAO_Operation_Details
&opdetails
,
464 TAO_OutputCDR
&output
)
466 if (this->messaging_object ()->generate_locate_request_header (opdetails
,
470 if (TAO_debug_level
> 0)
472 TAOLIB_ERROR ((LM_ERROR
,
473 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::generate_locate_request, ")
474 ACE_TEXT ("error while marshalling the LocateRequest header\n"),
485 TAO_Transport::generate_request_header (
486 TAO_Operation_Details
&opdetails
,
487 TAO_Target_Specification
&spec
,
488 TAO_OutputCDR
&output
)
490 if (this->messaging_object ()->generate_request_header (opdetails
,
494 if (TAO_debug_level
> 0)
496 TAOLIB_ERROR ((LM_ERROR
,
497 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::generate_request_header, ")
498 ACE_TEXT ("error while marshalling the Request header\n"),
508 /// @todo Ideally the following should be inline.
509 /// @todo purge_entry has a return value, use it
511 TAO_Transport::recache_transport (TAO_Transport_Descriptor_Interface
*desc
)
513 // First purge our entry
514 this->purge_entry ();
516 // Then add ourselves to the cache
517 return this->transport_cache_manager ().cache_transport (desc
, this);
521 TAO_Transport::purge_entry ()
523 if (TAO_debug_level
> 3)
525 TAOLIB_DEBUG ((LM_DEBUG
,
526 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::purge_entry, ")
527 ACE_TEXT ("entry is %@\n"),
528 this->id (), this->cache_map_entry_
));
531 return this->transport_cache_manager ().purge_entry (this->cache_map_entry_
);
535 TAO_Transport::can_be_purged ()
537 return !this->tms_
->has_request ();
541 TAO_Transport::make_idle ()
543 if (TAO_debug_level
> 3)
545 TAOLIB_DEBUG ((LM_DEBUG
,
546 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::make_idle\n"),
550 return this->transport_cache_manager ().make_idle (this->cache_map_entry_
);
554 TAO_Transport::update_transport ()
556 return this->transport_cache_manager ().update_entry (this->cache_map_entry_
);
560 * Methods called and used in the output path of the ORB.
562 TAO_Transport::Drain_Result
563 TAO_Transport::handle_output (TAO::Transport::Drain_Constraints
const & dc
)
565 if (TAO_debug_level
> 3)
567 TAOLIB_DEBUG ((LM_DEBUG
,
568 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output")
569 ACE_TEXT (" - block_on_io=%d, timeout=%d.%06d\n"),
572 dc
.timeout() ? dc
.timeout()->sec() : static_cast<time_t> (-1),
573 dc
.timeout() ? dc
.timeout()->usec() : -1 ));
576 // The flushing strategy (potentially via the Reactor) wants to send
577 // more data, first check if there is a current message that needs
579 Drain_Result
const retval
= this->drain_queue (dc
);
581 if (TAO_debug_level
> 3)
583 TAOLIB_DEBUG ((LM_DEBUG
,
584 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output, ")
585 ACE_TEXT ("drain_queue returns %d/%d\n"),
587 static_cast<int> (retval
.dre_
), ACE_ERRNO_GET
));
590 // Any errors are returned directly to the Reactor
595 TAO_Transport::format_queue_message (TAO_OutputCDR
&stream
,
596 ACE_Time_Value
*max_wait_time
,
599 if (this->messaging_object ()->format_message (stream
, stub
, nullptr) != 0)
602 if (this->queue_message_i (stream
.begin (), max_wait_time
) != 0)
605 // check the buffering constraints to see what must be done in post_open()
606 bool must_flush
= false;
607 this->flush_in_post_open_
|=
608 this->check_buffering_constraints_i (stub
,
615 TAO_Transport::send_message_block_chain (const ACE_Message_Block
*mb
,
616 size_t &bytes_transferred
,
617 ACE_Time_Value
*max_wait_time
)
619 ACE_GUARD_RETURN (ACE_Lock
, ace_mon
, *this->handler_lock_
, -1);
621 TAO::Transport::Drain_Constraints
dc(
622 max_wait_time
, true);
624 return this->send_message_block_chain_i (mb
,
630 TAO_Transport::send_message_block_chain_i (const ACE_Message_Block
*mb
,
631 size_t &bytes_transferred
,
632 TAO::Transport::Drain_Constraints
const & dc
)
634 size_t const total_length
= mb
->total_length ();
636 // We are going to block, so there is no need to clone
637 // the message block.
638 TAO_Synch_Queued_Message
synch_message (mb
, this->orb_core_
);
640 synch_message
.push_back (this->head_
, this->tail_
);
642 Drain_Result
const n
= this->drain_queue_i (dc
);
646 synch_message
.remove_from_list (this->head_
, this->tail_
);
647 return -1; // Error while sending...
649 else if (n
== DR_QUEUE_EMPTY
)
651 bytes_transferred
= total_length
;
652 return 1; // Empty queue, message was sent..
655 // Remove the temporary message from the queue...
656 synch_message
.remove_from_list (this->head_
, this->tail_
);
658 bytes_transferred
= total_length
- synch_message
.message_length ();
664 TAO_Transport::send_synchronous_message_i (const ACE_Message_Block
*mb
,
665 ACE_Time_Value
*max_wait_time
)
667 // We are going to block, so there is no need to clone
668 // the message block.
669 size_t const total_length
= mb
->total_length ();
670 TAO_Synch_Queued_Message
synch_message (mb
, this->orb_core_
);
672 synch_message
.push_back (this->head_
, this->tail_
);
674 int const result
= this->send_synch_message_helper_i (synch_message
,
676 if (result
== -1 && errno
== ETIME
)
678 if (total_length
== synch_message
.message_length ()) //none was sent
680 if (TAO_debug_level
> 2)
682 TAOLIB_DEBUG ((LM_DEBUG
,
683 ACE_TEXT ("TAO (%P|%t) - ")
684 ACE_TEXT ("Transport[%d]::send_synchronous_message_i, ")
685 ACE_TEXT ("timeout encountered before any bytes sent\n"),
688 throw ::CORBA::TIMEOUT (
689 CORBA::SystemException::_tao_minor_code (
690 TAO_TIMEOUT_SEND_MINOR_CODE
,
692 CORBA::COMPLETED_NO
);
699 else if(result
== -1 || result
== 1)
704 TAO_Flushing_Strategy
*flushing_strategy
=
705 this->orb_core ()->flushing_strategy ();
706 if (flushing_strategy
->schedule_output (this) == -1)
708 synch_message
.remove_from_list (this->head_
, this->tail_
);
709 if (TAO_debug_level
> 0)
711 TAOLIB_ERROR ((LM_ERROR
,
712 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
713 ACE_TEXT ("send_synchronous_message_i, ")
714 ACE_TEXT ("error while scheduling flush - %m\n"),
720 // No need to check for result == TAO_Flushing_Strategy::MUST_FLUSH,
721 // because we're always going to flush anyway.
723 // Release the mutex, other threads may modify the queue as we
724 // block for a long time writing out data.
727 typedef ACE_Reverse_Lock
<ACE_Lock
> TAO_REVERSE_LOCK
;
728 TAO_REVERSE_LOCK
reverse (*this->handler_lock_
);
729 ACE_GUARD_RETURN (TAO_REVERSE_LOCK
, ace_mon
, reverse
, -1);
731 flush_result
= flushing_strategy
->flush_message (this,
736 if (flush_result
== -1)
738 synch_message
.remove_from_list (this->head_
, this->tail_
);
740 // We don't need to do anything special for the timeout case.
741 // The connection is going to get closed and the Transport destroyed.
742 // The only thing to do maybe is to empty the queue.
744 if (TAO_debug_level
> 0)
746 TAOLIB_ERROR ((LM_ERROR
,
747 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_synchronous_message_i, ")
748 ACE_TEXT ("error while sending message - %m\n"),
759 TAO_Transport::send_reply_message_i (const ACE_Message_Block
*mb
,
760 ACE_Time_Value
*max_wait_time
)
762 // Don't clone now.. We could be sent in one shot!
763 TAO_Synch_Queued_Message
synch_message (mb
, this->orb_core_
);
765 synch_message
.push_back (this->head_
, this->tail_
);
768 this->send_synch_message_helper_i (synch_message
, max_wait_time
);
770 // What about partially sent messages.
771 if (n
== -1 || n
== 1)
776 if (TAO_debug_level
> 3)
778 TAOLIB_DEBUG ((LM_DEBUG
,
779 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_reply_message_i, ")
780 ACE_TEXT ("preparing to add to queue before leaving\n"),
784 // Till this point we shouldn't have any copying and that is the
785 // point anyway. Now, remove the node from the list
786 synch_message
.remove_from_list (this->head_
, this->tail_
);
788 // Clone the node that we have.
789 TAO_Queued_Message
*msg
=
790 synch_message
.clone (this->orb_core_
->transport_message_buffer_allocator ());
792 // Stick it in the queue
793 msg
->push_back (this->head_
, this->tail_
);
795 TAO_Flushing_Strategy
*flushing_strategy
=
796 this->orb_core ()->flushing_strategy ();
798 int const result
= flushing_strategy
->schedule_output (this);
802 if (TAO_debug_level
> 5)
804 TAOLIB_DEBUG ((LM_DEBUG
, "TAO (%P|%t) - Transport[%d]::send_reply_"
805 "message_i, dequeuing msg due to schedule_output "
806 "failure\n", this->id ()));
808 msg
->remove_from_list (this->head_
, this->tail_
);
811 else if (result
== TAO_Flushing_Strategy::MUST_FLUSH
)
813 typedef ACE_Reverse_Lock
<ACE_Lock
> TAO_REVERSE_LOCK
;
814 TAO_REVERSE_LOCK
reverse (*this->handler_lock_
);
815 ACE_GUARD_RETURN (TAO_REVERSE_LOCK
, ace_mon
, reverse
, -1);
816 (void) flushing_strategy
->flush_transport (this, nullptr);
823 TAO_Transport::send_synch_message_helper_i (TAO_Synch_Queued_Message
&synch_message
,
824 ACE_Time_Value
* max_wait_time
)
826 TAO::Transport::Drain_Constraints
dc(
827 max_wait_time
, this->using_blocking_io_for_synch_messages());
829 Drain_Result
const n
= this->drain_queue_i (dc
);
833 synch_message
.remove_from_list (this->head_
, this->tail_
);
834 return -1; // Error while sending...
836 else if (n
== DR_QUEUE_EMPTY
)
838 return 1; // Empty queue, message was sent..
841 if (synch_message
.all_data_sent ())
850 TAO_Transport::schedule_output_i ()
852 ACE_Event_Handler
* const eh
= this->event_handler_i ();
853 ACE_Reactor
* const reactor
= eh
->reactor ();
855 if (reactor
== nullptr)
857 if (TAO_debug_level
> 1)
859 TAOLIB_ERROR ((LM_ERROR
,
860 ACE_TEXT ("TAO (%P|%t) - ")
861 ACE_TEXT ("Transport[%d]::schedule_output_i, ")
862 ACE_TEXT ("no reactor,")
863 ACE_TEXT ("returning -1\n"),
869 // Check to see if our event handler is still registered with the
870 // reactor. It's possible for another thread to have run close_connection()
871 // since we last used the event handler.
872 ACE_Event_Handler
* const found
= reactor
->find_handler (eh
->get_handle ());
875 found
->remove_reference ();
879 if (TAO_debug_level
> 3)
881 TAOLIB_ERROR ((LM_ERROR
,
882 ACE_TEXT ("TAO (%P|%t) - ")
883 ACE_TEXT ("Transport[%d]::schedule_output_i ")
884 ACE_TEXT ("event handler not found in reactor,")
885 ACE_TEXT ("returning -1\n"),
893 if (TAO_debug_level
> 3)
895 TAOLIB_DEBUG ((LM_DEBUG
,
896 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::schedule_output_i\n"),
900 return reactor
->schedule_wakeup (eh
, ACE_Event_Handler::WRITE_MASK
);
904 TAO_Transport::cancel_output_i ()
906 ACE_Event_Handler
* const eh
= this->event_handler_i ();
907 ACE_Reactor
*const reactor
= eh
->reactor ();
909 if (TAO_debug_level
> 3)
911 TAOLIB_DEBUG ((LM_DEBUG
,
912 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cancel_output_i\n"),
916 return reactor
->cancel_wakeup (eh
, ACE_Event_Handler::WRITE_MASK
);
920 TAO_Transport::handle_timeout (const ACE_Time_Value
& /* current_time */,
923 if (TAO_debug_level
> 6)
925 TAOLIB_DEBUG ((LM_DEBUG
,
926 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_timeout, ")
927 ACE_TEXT ("timer expired\n"),
931 /// This is the only legal ACT in the current configuration....
932 if (act
!= &this->current_deadline_
)
937 if (this->flush_timer_pending ())
939 // The timer is always a oneshot timer, so mark is as not
941 this->reset_flush_timer ();
943 TAO_Flushing_Strategy
*flushing_strategy
=
944 this->orb_core ()->flushing_strategy ();
945 int const result
= flushing_strategy
->schedule_output (this);
946 if (result
== TAO_Flushing_Strategy::MUST_FLUSH
)
948 typedef ACE_Reverse_Lock
<ACE_Lock
> TAO_REVERSE_LOCK
;
949 TAO_REVERSE_LOCK
reverse (*this->handler_lock_
);
950 ACE_GUARD_RETURN (TAO_REVERSE_LOCK
, ace_mon
, reverse
, -1);
951 if (flushing_strategy
->flush_transport (this, nullptr) == -1) {
960 TAO_Transport::Drain_Result
961 TAO_Transport::drain_queue (TAO::Transport::Drain_Constraints
const & dc
)
963 ACE_GUARD_RETURN (ACE_Lock
, ace_mon
, *this->handler_lock_
, DR_ERROR
);
964 Drain_Result
const retval
= this->drain_queue_i (dc
);
966 if (retval
== DR_QUEUE_EMPTY
)
968 // ... there is no current message or it was completely
969 // sent, cancel output...
970 TAO_Flushing_Strategy
*flushing_strategy
=
971 this->orb_core ()->flushing_strategy ();
973 flushing_strategy
->cancel_output (this);
981 TAO_Transport::Drain_Result
982 TAO_Transport::drain_queue_helper (int &iovcnt
, iovec iov
[],
983 TAO::Transport::Drain_Constraints
const & dc
)
985 // As a side-effect, this decrements the timeout() pointed-to value by
986 // the time used in this function. That might be important as there are
987 // potentially long running system calls invoked from here.
988 TAO::ORB_Countdown_Time
countdown(dc
.timeout());
990 size_t byte_count
= 0;
992 // ... send the message ...
995 #if TAO_HAS_SENDFILE == 1
996 if (this->mmap_allocator_
)
997 retval
= this->sendfile (this->mmap_allocator_
,
1003 #endif /* TAO_HAS_SENDFILE==1 */
1004 retval
= this->send (iov
, iovcnt
, byte_count
,
1005 this->io_timeout (dc
));
1007 if (TAO_debug_level
> 9)
1009 dump_iov (iov
, iovcnt
, this->id (),
1010 byte_count
, ACE_TEXT("drain_queue_helper"));
1015 if (TAO_debug_level
> 4)
1017 TAOLIB_DEBUG ((LM_DEBUG
,
1018 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
1019 ACE_TEXT ("send() returns 0\n"),
1024 else if (retval
== -1)
1026 if (TAO_debug_level
> 4)
1028 TAOLIB_DEBUG ((LM_DEBUG
,
1029 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
1030 ACE_TEXT ("error during send() (errno: %d) - %m\n"),
1031 this->id (), ACE_ERRNO_GET
));
1034 if (errno
== EWOULDBLOCK
|| errno
== EAGAIN
)
1036 return DR_WOULDBLOCK
;
1042 // ... now we need to update the queue, removing elements
1043 // that have been sent, and updating the last element if it
1044 // was only partially sent ...
1045 this->cleanup_queue (byte_count
);
1048 // ... start over, how do we guarantee progress? Because if
1049 // no bytes are sent send() can only return 0 or -1
1051 // Total no. of bytes sent for a send call
1052 this->sent_byte_count_
+= byte_count
;
1054 if (TAO_debug_level
> 4)
1056 TAOLIB_DEBUG ((LM_DEBUG
,
1057 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
1058 ACE_TEXT ("byte_count = %d, head_is_empty = %d\n"),
1059 this->id(), byte_count
, this->queue_is_empty_i ()));
1062 return DR_QUEUE_EMPTY
;
1063 // drain_queue_i will check if the queue is actually empty
1066 TAO_Transport::Drain_Result
1067 TAO_Transport::drain_queue_i (TAO::Transport::Drain_Constraints
const & dc
)
1069 // This is the vector used to send data, it must be declared outside
1070 // the loop because after the loop there may still be data to be
1073 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
1074 iovec iov
[ACE_IOV_MAX
] = { { nullptr , 0 } };
1076 iovec iov
[ACE_IOV_MAX
];
1077 #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */
1079 // We loop over all the elements in the queue ...
1080 TAO_Queued_Message
*i
= this->head_
;
1082 // Reset the value so that the counting is done for each new send
1084 this->sent_byte_count_
= 0;
1086 // Avoid calling this expensive function each time through the loop. Instead
1087 // we'll assume that the time is unlikely to change much during the loop.
1088 // If we are forced to send in the loop then we'll recompute the time.
1089 ACE_Time_Value now
= ACE_High_Res_Timer::gettimeofday_hr ();
1091 while (i
!= nullptr)
1093 if (i
->is_expired (now
))
1095 if (TAO_debug_level
> 3)
1097 TAOLIB_DEBUG ((LM_DEBUG
,
1098 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ")
1099 ACE_TEXT ("Discarding expired queued message.\n"),
1102 TAO_Queued_Message
*next
= i
->next ();
1103 i
->state_changed (TAO_LF_Event::LFS_TIMEOUT
,
1104 this->orb_core_
->leader_follower ());
1105 i
->remove_from_list (this->head_
, this->tail_
);
1110 // ... each element fills the iovector ...
1111 i
->fill_iov (ACE_IOV_MAX
, iovcnt
, iov
);
1113 // ... the vector is full, no choice but to send some data out.
1114 // We need to loop because a single message can span multiple
1115 // IOV_MAX elements ...
1116 if (iovcnt
== ACE_IOV_MAX
)
1118 Drain_Result
const retval
=
1119 this->drain_queue_helper (iovcnt
, iov
, dc
);
1121 if (TAO_debug_level
> 4)
1123 TAOLIB_DEBUG ((LM_DEBUG
,
1124 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ")
1125 ACE_TEXT ("helper retval = %d\n"),
1126 this->id (), static_cast<int> (retval
.dre_
)));
1129 if (retval
!= DR_QUEUE_EMPTY
)
1134 now
= ACE_High_Res_Timer::gettimeofday_hr ();
1139 // ... notice that this line is only reached if there is still
1140 // room in the iovector ...
1146 Drain_Result
const retval
= this->drain_queue_helper (iovcnt
, iov
, dc
);
1148 if (TAO_debug_level
> 4)
1150 TAOLIB_DEBUG ((LM_DEBUG
,
1151 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ")
1152 ACE_TEXT ("helper retval = %d\n"),
1153 this->id (), static_cast<int> (retval
.dre_
)));
1156 if (retval
!= DR_QUEUE_EMPTY
)
1162 if (this->queue_is_empty_i ())
1164 if (this->flush_timer_pending ())
1166 ACE_Event_Handler
*eh
= this->event_handler_i ();
1167 ACE_Reactor
* const reactor
= eh
->reactor ();
1168 reactor
->cancel_timer (this->flush_timer_id_
);
1169 this->reset_flush_timer ();
1172 return DR_QUEUE_EMPTY
;
1179 TAO_Transport::cleanup_queue_i ()
1181 if (TAO_debug_level
> 4)
1183 TAOLIB_DEBUG ((LM_DEBUG
,
1184 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ")
1185 ACE_TEXT ("cleaning up complete queue\n"),
1189 size_t byte_count
= 0;
1192 // Cleanup all messages
1193 while (!this->queue_is_empty_i ())
1195 TAO_Queued_Message
*i
= this->head_
;
1197 if (TAO_debug_level
> 4)
1199 byte_count
+= i
->message_length();
1202 // @@ This is a good point to insert a flag to indicate that a
1203 // CloseConnection message was successfully received.
1204 i
->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED
,
1205 this->orb_core_
->leader_follower ());
1207 i
->remove_from_list (this->head_
, this->tail_
);
1212 if (TAO_debug_level
> 4)
1214 TAOLIB_DEBUG ((LM_DEBUG
,
1215 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ")
1216 ACE_TEXT ("discarded %d messages, %u bytes.\n"),
1217 this->id (), msg_count
, byte_count
));
1222 TAO_Transport::cleanup_queue (size_t byte_count
)
1224 while (!this->queue_is_empty_i () && byte_count
> 0)
1226 TAO_Queued_Message
*i
= this->head_
;
1228 if (TAO_debug_level
> 4)
1230 TAOLIB_DEBUG ((LM_DEBUG
,
1231 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ")
1232 ACE_TEXT ("byte_count = %d\n"),
1233 this->id (), byte_count
));
1236 // Update the state of the first message
1237 i
->bytes_transferred (byte_count
);
1239 if (TAO_debug_level
> 4)
1241 TAOLIB_DEBUG ((LM_DEBUG
,
1242 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ")
1243 ACE_TEXT ("after transfer, bc = %d, all_sent = %d, ml = %d\n"),
1244 this->id (), byte_count
, i
->all_data_sent (),
1245 i
->message_length ()));
1248 // ... if all the data was sent the message must be removed from
1250 if (i
->all_data_sent ())
1252 i
->remove_from_list (this->head_
, this->tail_
);
1255 else if (byte_count
== 0)
1257 // If we have sent out a full message block, but we are not
1258 // finished with this message, we need to do something with the
1259 // message block chain held by our output stream. If we don't,
1260 // another thread can attempt to service this transport and end
1261 // up resetting the output stream which will release the
1262 // message that we haven't finished sending.
1263 i
->copy_if_necessary (this->out_stream ().begin ());
1269 TAO_Transport::check_buffering_constraints_i (TAO_Stub
*stub
, bool &must_flush
)
1271 // First let's compute the size of the queue:
1272 size_t msg_count
= 0;
1273 size_t total_bytes
= 0;
1275 for (TAO_Queued_Message
*i
= this->head_
; i
!= nullptr; i
= i
->next ())
1278 total_bytes
+= i
->message_length ();
1281 bool set_timer
= false;
1282 ACE_Time_Value new_deadline
;
1284 TAO::Transport_Queueing_Strategy
*queue_strategy
=
1285 stub
->transport_queueing_strategy ();
1287 bool constraints_reached
= true;
1291 constraints_reached
=
1292 queue_strategy
->buffering_constraints_reached (stub
,
1296 this->current_deadline_
,
1305 // ... set the new timer, also cancel any previous timers ...
1306 // Check for connected state since this method may be called
1307 // before the connection is established and than there will be no
1308 // reactor available yet.
1309 if (set_timer
&& this->is_connected_
)
1311 ACE_Event_Handler
*eh
= this->event_handler_i ();
1312 ACE_Reactor
* const reactor
= eh
->reactor ();
1313 this->current_deadline_
= new_deadline
;
1314 ACE_Time_Value delay
= new_deadline
- ACE_OS::gettimeofday ();
1316 if (this->flush_timer_pending ())
1318 reactor
->cancel_timer (this->flush_timer_id_
);
1321 this->flush_timer_id_
=
1322 reactor
->schedule_timer (&this->transport_timer_
,
1323 &this->current_deadline_
,
1327 return constraints_reached
;
1331 TAO_Transport::report_invalid_event_handler (const char *caller
)
1333 if (TAO_debug_level
> 0)
1335 TAOLIB_DEBUG ((LM_DEBUG
,
1336 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::report_invalid_event_handler")
1337 ACE_TEXT ("(%C) no longer associated with handler [tag=%d]\n"),
1338 this->id (), caller
, this->tag_
));
1343 TAO_Transport::send_connection_closed_notifications ()
1346 ACE_MT (ACE_GUARD (ACE_Lock
, guard
, *this->handler_lock_
));
1348 this->send_connection_closed_notifications_i ();
1351 this->tms ()->connection_closed ();
1355 TAO_Transport::send_connection_closed_notifications_i ()
1357 this->cleanup_queue_i ();
1361 TAO_Transport::send_message_shared_i (TAO_Stub
*stub
,
1362 TAO_Message_Semantics message_semantics
,
1363 const ACE_Message_Block
*message_block
,
1364 ACE_Time_Value
*max_wait_time
)
1368 #if TAO_HAS_TRANSPORT_CURRENT == 1
1369 size_t const message_length
= message_block
->length ();
1370 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
1372 switch (message_semantics
.type_
)
1374 case TAO_Message_Semantics::TAO_TWOWAY_REQUEST
:
1375 ret
= this->send_synchronous_message_i (message_block
, max_wait_time
);
1378 case TAO_Message_Semantics::TAO_REPLY
:
1379 ret
= this->send_reply_message_i (message_block
, max_wait_time
);
1382 case TAO_Message_Semantics::TAO_ONEWAY_REQUEST
:
1383 ret
= this->send_asynchronous_message_i (stub
,
1389 #if TAO_HAS_TRANSPORT_CURRENT == 1
1390 // "Count" the message, only if no error was encountered.
1391 if (ret
!= -1 && this->stats_
!= nullptr)
1392 this->stats_
->messages_sent (message_length
);
1393 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
1399 TAO_Transport::send_asynchronous_message_i (TAO_Stub
*stub
,
1400 const ACE_Message_Block
*message_block
,
1401 ACE_Time_Value
*max_wait_time
)
1403 // Let's figure out if the message should be queued without trying
1405 bool try_sending_first
= true;
1407 bool const queue_empty
= this->queue_is_empty_i ();
1409 TAO::Transport_Queueing_Strategy
*queue_strategy
=
1410 stub
->transport_queueing_strategy ();
1414 try_sending_first
= false;
1416 else if (queue_strategy
)
1418 if (queue_strategy
->must_queue (queue_empty
))
1420 try_sending_first
= false;
1424 bool partially_sent
= false;
1425 bool timeout_encountered
= false;
1427 TAO::Transport::Drain_Constraints
dc(
1428 max_wait_time
, this->using_blocking_io_for_asynch_messages());
1430 if (try_sending_first
)
1433 size_t byte_count
= 0;
1434 // ... in this case we must try to send the message first ...
1436 size_t const total_length
= message_block
->total_length ();
1438 if (TAO_debug_level
> 6)
1440 TAOLIB_DEBUG ((LM_DEBUG
,
1441 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
1442 ACE_TEXT ("trying to send the message (ml = %d)\n"),
1443 this->id (), total_length
));
1446 // @@ I don't think we want to hold the mutex here, however if
1447 // we release it we need to recheck the status of the transport
1448 // after we return... once I understand the final form for this
1449 // code I will re-visit this decision
1450 n
= this->send_message_block_chain_i (message_block
,
1456 // ... if this is just an EWOULDBLOCK we must schedule the
1457 // message for later, if it is ETIME we still have to send
1458 // the complete message, because cutting off the message at
1459 // this point will destroy the synchronization with the
1461 if (errno
!= EWOULDBLOCK
&& errno
!= ETIME
)
1463 if (TAO_debug_level
> 0)
1465 TAOLIB_ERROR ((LM_ERROR
,
1466 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
1467 ACE_TEXT ("fatal error in ")
1468 ACE_TEXT ("send_message_block_chain_i - %m\n"),
1475 // ... let's figure out if the complete message was sent ...
1476 if (total_length
== byte_count
)
1478 // Done, just return. Notice that there are no allocations
1479 // or copies up to this point (though some fancy calling
1481 // This is the common case for the critical path, it should
1488 partially_sent
= true;
1491 // If it was partially sent, then push to front of queue and don't flush
1492 if (n
== -1 && errno
== ETIME
)
1494 timeout_encountered
= true;
1495 if (byte_count
== 0)
1497 // This request has timed out and none of it was sent to the transport
1498 // We can't return -1 here, since that would end up closing the transport
1499 if (TAO_debug_level
> 2)
1501 TAOLIB_DEBUG ((LM_DEBUG
,
1502 ACE_TEXT ("TAO (%P|%t) - ")
1503 ACE_TEXT ("Transport[%d]::send_asynchronous_message_i, ")
1504 ACE_TEXT ("timeout encountered before any bytes sent\n"),
1507 throw ::CORBA::TIMEOUT (
1508 CORBA::SystemException::_tao_minor_code (
1509 TAO_TIMEOUT_SEND_MINOR_CODE
,
1511 CORBA::COMPLETED_NO
);
1515 if (TAO_debug_level
> 6)
1517 TAOLIB_DEBUG ((LM_DEBUG
,
1518 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
1519 ACE_TEXT ("partial send %d / %d bytes\n"),
1520 this->id (), byte_count
, total_length
));
1523 // ... part of the data was sent, need to figure out what piece
1524 // of the message block chain must be queued ...
1525 while (message_block
!= nullptr && message_block
->length () == 0)
1527 message_block
= message_block
->cont ();
1530 // ... at least some portion of the message block chain should
1534 // ... either the message must be queued or we need to queue it
1535 // because it was not completely sent out ...
1537 ACE_Time_Value
*wait_time
= (partially_sent
? nullptr: max_wait_time
);
1538 if (this->queue_message_i (message_block
, wait_time
, !partially_sent
)
1541 if (TAO_debug_level
> 0)
1543 TAOLIB_DEBUG ((LM_DEBUG
,
1544 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
1545 ACE_TEXT ("send_asynchronous_message_i, ")
1546 ACE_TEXT ("cannot queue message for - %m\n"),
1552 if (TAO_debug_level
> 6)
1554 TAOLIB_DEBUG ((LM_DEBUG
,
1555 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
1556 ACE_TEXT ("message is queued\n"),
1560 if (timeout_encountered
&& partially_sent
)
1562 //Must close down the transport here since we can't guarantee the
1563 //integrity of the GIOP stream (the next send may try to write to
1564 //the socket before looking at the queue).
1565 if (TAO_debug_level
> 0)
1567 TAOLIB_DEBUG ((LM_DEBUG
,
1568 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
1569 ACE_TEXT ("send_asynchronous_message_i, ")
1570 ACE_TEXT ("timeout after partial send, closing.\n"),
1575 else if (!timeout_encountered
)
1577 // We can't flush if we have already encountered a timeout
1578 // ... if the queue is full we need to activate the output on the
1580 bool must_flush
= false;
1581 const bool constraints_reached
=
1582 this->check_buffering_constraints_i (stub
,
1585 // ... but we also want to activate it if the message was partially
1586 // sent.... Plus, when we use the blocking flushing strategy the
1587 // queue is flushed as a side-effect of 'schedule_output()'
1589 TAO_Flushing_Strategy
*flushing_strategy
=
1590 this->orb_core ()->flushing_strategy ();
1592 if (constraints_reached
|| try_sending_first
)
1594 int const result
= flushing_strategy
->schedule_output (this);
1595 if (result
== TAO_Flushing_Strategy::MUST_FLUSH
)
1603 if (TAO_debug_level
> 0)
1605 TAOLIB_DEBUG ((LM_DEBUG
,
1606 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
1607 ACE_TEXT ("send_asynchronous_message_i, ")
1608 ACE_TEXT ("flushing transport.\n"),
1612 size_t const sent_byte
= sent_byte_count_
;
1615 typedef ACE_Reverse_Lock
<ACE_Lock
> TAO_REVERSE_LOCK
;
1616 TAO_REVERSE_LOCK
reverse (*this->handler_lock_
);
1617 ACE_GUARD_RETURN (TAO_REVERSE_LOCK
, ace_mon
, reverse
, -1);
1618 ret
= flushing_strategy
->flush_transport (this, max_wait_time
);
1625 if (sent_byte
== sent_byte_count_
) // if nothing was actually flushed
1627 // This request has timed out and none of it was sent to the transport
1628 // We can't return -1 here, since that would end up closing the transport
1629 if (TAO_debug_level
> 2)
1631 TAOLIB_DEBUG ((LM_DEBUG
,
1632 ACE_TEXT ("TAO (%P|%t) - ")
1633 ACE_TEXT ("Transport[%d]::send_asynchronous_message_i, ")
1634 ACE_TEXT ("2 timeout encountered before any bytes sent\n"),
1637 throw ::CORBA::TIMEOUT (CORBA::SystemException::_tao_minor_code
1638 (TAO_TIMEOUT_SEND_MINOR_CODE
, ETIME
),
1639 CORBA::COMPLETED_NO
);
1650 TAO_Transport::queue_message_i (const ACE_Message_Block
*message_block
,
1651 ACE_Time_Value
*max_wait_time
, bool back
)
1653 TAO_Queued_Message
*queued_message
= nullptr;
1654 ACE_NEW_RETURN (queued_message
,
1655 TAO_Asynch_Queued_Message (message_block
,
1662 queued_message
->push_back (this->head_
, this->tail_
);
1665 queued_message
->push_front (this->head_
, this->tail_
);
1672 * All the methods relevant to the incoming data path of the ORB are
1676 TAO_Transport::handle_input (TAO_Resume_Handle
&rh
,
1677 ACE_Time_Value
* max_wait_time
)
1679 if (TAO_debug_level
> 3)
1681 TAOLIB_DEBUG ((LM_DEBUG
,
1682 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input\n"),
1686 // First try to process messages of the head of the incoming queue.
1687 int const retval
= this->process_queue_head (rh
);
1693 if (TAO_debug_level
> 2)
1695 TAOLIB_ERROR ((LM_ERROR
,
1696 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
1697 ACE_TEXT ("error while parsing the head of the queue\n"),
1706 // Processed a message in queue successfully. This
1707 // thread must return to thread-pool now.
1712 TAO_Queued_Data
*q_data
= nullptr;
1714 if (this->incoming_message_stack_
.top (q_data
) != -1
1715 && q_data
->missing_data () != TAO_MISSING_DATA_UNDEFINED
)
1717 /* PRE: q_data->missing_data_ > 0 as all QD on stack must be incomplete */
1718 if (this->handle_input_missing_data (rh
, max_wait_time
, q_data
) == -1)
1720 if (TAO_debug_level
> 0)
1722 TAOLIB_ERROR ((LM_ERROR
,
1723 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
1724 ACE_TEXT ("error consolidating incoming message\n"),
1732 if (this->handle_input_parse_data (rh
, max_wait_time
) == -1)
1734 if (TAO_debug_level
> 0)
1736 TAOLIB_ERROR ((LM_ERROR
,
1737 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
1738 ACE_TEXT ("error parsing incoming message\n"),
1749 TAO_Transport::consolidate_process_message (TAO_Queued_Data
*q_data
,
1750 TAO_Resume_Handle
&rh
)
1753 if (q_data
->missing_data () != 0)
1755 if (TAO_debug_level
> 0)
1757 TAOLIB_ERROR ((LM_ERROR
,
1758 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
1759 ACE_TEXT ("missing data\n"),
1765 if (q_data
->more_fragments () ||
1766 q_data
->msg_type () == GIOP::Fragment
)
1768 // consolidate message on top of stack, only for fragmented messages
1769 TAO_Queued_Data
*new_q_data
= nullptr;
1771 switch (this->messaging_object()->consolidate_fragmented_message (q_data
, new_q_data
))
1776 case 0: // returning consolidated message in q_data
1779 if (TAO_debug_level
> 0)
1781 TAOLIB_ERROR ((LM_ERROR
,
1782 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
1783 ACE_TEXT ("error, consolidated message is NULL\n"),
1790 if (this->process_parsed_messages (new_q_data
, rh
) == -1)
1792 TAO_Queued_Data::release (new_q_data
);
1794 if (TAO_debug_level
> 0)
1796 TAOLIB_ERROR ((LM_ERROR
,
1797 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
1798 ACE_TEXT ("error processing consolidated message\n"),
1804 TAO_Queued_Data::release (new_q_data
);
1808 case 1: // fragment has been stored in messaging_oject()
1814 if (this->process_parsed_messages (q_data
, rh
) == -1)
1816 TAO_Queued_Data::release (q_data
);
1818 if (TAO_debug_level
> 0)
1820 TAOLIB_ERROR ((LM_ERROR
,
1821 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
1822 ACE_TEXT ("error processing message\n"),
1828 TAO_Queued_Data::release (q_data
);
1835 TAO_Transport::consolidate_enqueue_message (TAO_Queued_Data
*q_data
)
1837 // consolidate message on top of stack, only for fragmented messages
1840 if (q_data
->missing_data () != 0)
1845 if (q_data
->more_fragments () ||
1846 q_data
->msg_type () == GIOP::Fragment
)
1848 TAO_Queued_Data
*new_q_data
= nullptr;
1850 switch (this->messaging_object()->consolidate_fragmented_message (q_data
, new_q_data
))
1855 case 0: // returning consolidated message in new_q_data
1858 if (TAO_debug_level
> 0)
1860 TAOLIB_ERROR ((LM_ERROR
,
1861 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_enqueue_message, ")
1862 ACE_TEXT ("error, consolidated message is NULL\n"),
1868 if (this->incoming_message_queue_
.enqueue_tail (new_q_data
) != 0)
1870 TAO_Queued_Data::release (new_q_data
);
1875 case 1: // fragment has been stored in messaging_oject()
1881 if (this->incoming_message_queue_
.enqueue_tail (q_data
) != 0)
1883 TAO_Queued_Data::release (q_data
);
1888 return 0; // success
1892 TAO_Transport::handle_input_missing_data (TAO_Resume_Handle
&rh
,
1893 ACE_Time_Value
* max_wait_time
,
1894 TAO_Queued_Data
*q_data
)
1897 if (q_data
== nullptr)
1902 if (TAO_debug_level
> 3)
1904 TAOLIB_DEBUG ((LM_DEBUG
,
1905 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ")
1906 ACE_TEXT ("enter (missing data == %d)\n"),
1907 this->id (), q_data
->missing_data ()));
1910 size_t const recv_size
= q_data
->missing_data ();
1912 if (q_data
->msg_block ()->space() < recv_size
)
1914 // make sure the message_block has enough space
1915 size_t const message_size
= recv_size
+ q_data
->msg_block ()->length();
1917 if (ACE_CDR::grow (q_data
->msg_block (), message_size
) == -1)
1923 // Saving the size of the received buffer in case any one needs to
1924 // get the size of the message that is received in the
1925 // context. Obviously the value will be changed for each recv call
1926 // and the user is supposed to invoke the accessor only in the
1927 // invocation context to get meaningful information.
1928 this->recv_buffer_size_
= recv_size
;
1930 // Read the message into the existing message block on heap
1931 ssize_t
const n
= this->recv (q_data
->msg_block ()->wr_ptr(),
1937 return ACE_Utils::truncate_cast
<int> (n
);
1940 if (TAO_debug_level
> 3)
1942 TAOLIB_DEBUG ((LM_DEBUG
,
1943 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ")
1944 ACE_TEXT ("read bytes %d\n"),
1948 q_data
->msg_block ()->wr_ptr(n
);
1949 q_data
->missing_data (q_data
->missing_data () - n
);
1951 if (q_data
->missing_data () == 0)
1954 if (this->incoming_message_stack_
.pop (q_data
) == -1)
1959 if (this->consolidate_process_message (q_data
, rh
) == -1)
1970 TAO_Transport::handle_input_parse_extra_messages (
1971 ACE_Message_Block
&message_block
)
1973 // store buffer status of last extraction: -1 parse error, 0
1974 // incomplete message header in buffer, 1 complete messages header
1978 TAO_Queued_Data
*q_data
= nullptr; // init
1980 // parse buffer until all messages have been extracted, consolidate
1981 // and enqueue complete messages, if the last message being parsed
1982 // has missin data, it is stays on top of incoming_message_stack.
1983 while (message_block
.length () > 0 &&
1984 (buf_status
= this->messaging_object ()->extract_next_message
1985 (message_block
, q_data
)) != -1 &&
1986 q_data
!= nullptr) // paranoid check
1988 if (q_data
->missing_data () == 0)
1990 if (this->consolidate_enqueue_message (q_data
) == -1)
1995 else // incomplete message read, probably the last message in buffer
1998 this->incoming_message_stack_
.push (q_data
);
2001 q_data
= nullptr; // reset
2004 if (buf_status
== -1)
2013 TAO_Transport::handle_input_parse_data (TAO_Resume_Handle
&rh
,
2014 ACE_Time_Value
* max_wait_time
)
2016 if (TAO_debug_level
> 3)
2018 TAOLIB_DEBUG ((LM_DEBUG
,
2019 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
2020 ACE_TEXT ("enter\n"),
2024 // The buffer on the stack which will be used to hold the input
2025 // messages, ACE_CDR::MAX_ALIGNMENT compensates the
2026 // memory-alignment. This improves performance with SUN-Java-ORB-1.4
2027 // and higher that sends fragmented requests of size 1024 bytes.
2028 char buf
[TAO_MAXBUFSIZE
+ ACE_CDR::MAX_ALIGNMENT
];
2030 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
2031 (void) ACE_OS::memset (buf
,
2034 #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */
2036 // Create a data block
2037 ACE_Data_Block
db (sizeof (buf
),
2038 ACE_Message_Block::MB_DATA
,
2040 this->orb_core_
->input_cdr_buffer_allocator (),
2041 this->orb_core_
->locking_strategy (),
2042 ACE_Message_Block::DONT_DELETE
,
2043 this->orb_core_
->input_cdr_dblock_allocator ());
2045 // Create a message block
2046 ACE_Message_Block
message_block (&db
,
2047 ACE_Message_Block::DONT_DELETE
,
2048 this->orb_core_
->input_cdr_msgblock_allocator ());
2050 // Align the message block
2051 ACE_CDR::mb_align (&message_block
);
2053 size_t recv_size
= 0; // Note: unsigned integer
2055 // Pointer to newly parsed message
2056 TAO_Queued_Data
*q_data
= nullptr;
2058 // Optimizing access of constants
2059 size_t const header_length
= this->messaging_object ()->header_length ();
2062 if (header_length
> message_block
.space ())
2067 if (this->orb_core_
->orb_params ()->single_read_optimization ())
2069 recv_size
= message_block
.space ();
2073 // Single read optimization has been de-activated. That means
2074 // that we need to read from transport the GIOP header first
2075 // before the payload. This codes first checks the incoming
2076 // stack for partial messages which needs to be
2077 // consolidated. Otherwise we are in new cycle, reading complete
2078 // GIOP header of new incoming message.
2079 if (this->incoming_message_stack_
.top (q_data
) != -1
2080 && q_data
->missing_data () == TAO_MISSING_DATA_UNDEFINED
)
2082 // There is a partial message on incoming_message_stack_
2083 // whose length is unknown so far. We need to consolidate
2084 // the GIOP header to get to know the payload size,
2085 recv_size
= header_length
- q_data
->msg_block ()->length ();
2089 // Read amount of data forming GIOP header of new incoming
2091 recv_size
= header_length
;
2093 // POST: 0 <= recv_size <= header_length
2095 // POST: 0 <= recv_size <= message_block->space ()
2097 // If we have a partial message, copy it into our message block and
2098 // clear out the partial message.
2099 if (this->partial_message_
!= nullptr && this->partial_message_
->length () > 0)
2101 // (*) Copy back the partial message into current read-buffer,
2102 // verify that the read-strategy of "recv_size" bytes is not
2103 // exceeded. The latter check guarantees that recv_size does not
2104 // roll-over and keeps in range
2105 // 0<=recv_size<=message_block->space()
2106 if (this->partial_message_
->length () <= recv_size
&&
2107 message_block
.copy (this->partial_message_
->rd_ptr (),
2108 this->partial_message_
->length ()) == 0)
2110 recv_size
-= this->partial_message_
->length ();
2111 // reset is done later to avoid problem in case of EWOULDBLOCK
2119 // POST: 0 <= recv_size <= buffer_space
2121 if (0 >= recv_size
) // paranoid: the check above (*) guarantees recv_size>=0
2123 // This event would cause endless looping, trying frequently to
2124 // read zero bytes from stream. This might happen, if TAOs
2125 // protocol implementation is not correct and tries to read data
2126 // beyond header without "single_read_optimazation" being
2128 if (TAO_debug_level
> 0)
2130 TAOLIB_ERROR ((LM_ERROR
,
2131 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
2132 ACE_TEXT ("Error - endless loop detection, closing connection"),
2135 if (this->partial_message_
!= nullptr && this->partial_message_
->length () > 0)
2137 this->partial_message_
->reset ();
2142 // Saving the size of the received buffer in case any one needs to
2143 // get the size of the message thats received in the
2144 // context. Obviously the value will be changed for each recv call
2145 // and the user is supposed to invoke the accessor only in the
2146 // invocation context to get meaningful information.
2147 this->recv_buffer_size_
= recv_size
;
2149 // Read the message into the message block that we have created on
2151 ssize_t
const n
= this->recv (message_block
.wr_ptr (),
2155 // If there is an error return to the reactor..
2156 // do not reset partial message in case of n == 0 (EWOULDBLOCK || EAGAIN),
2157 // we will need it during next try
2161 (this->partial_message_
!= nullptr && this->partial_message_
->length () > 0))
2163 this->partial_message_
->reset ();
2166 return ACE_Utils::truncate_cast
<int> (n
);
2169 if (this->partial_message_
!= nullptr && this->partial_message_
->length () > 0)
2171 this->partial_message_
->reset ();
2174 if (TAO_debug_level
> 3)
2176 TAOLIB_DEBUG ((LM_DEBUG
,
2177 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
2178 ACE_TEXT ("read %d bytes\n"),
2182 // Set the write pointer in the stack buffer
2183 message_block
.wr_ptr (n
);
2186 // STACK PROCESSING OR MESSAGE CONSOLIDATION
2189 // PRE: data in buffer is aligned && message_block.length() > 0
2191 if (this->incoming_message_stack_
.top (q_data
) != -1
2192 && q_data
->missing_data () == TAO_MISSING_DATA_UNDEFINED
)
2195 // MESSAGE CONSOLIDATION
2198 // Partial message on incoming_message_stack_ needs to be
2199 // consolidated. The message header could not be parsed so far
2200 // and therefor the message size is unknown yet. Consolidating
2201 // the message destroys the memory alignment of succeeding
2202 // messages sharing the buffer, for that reason consolidation
2203 // and stack based processing are mutial exclusive.
2204 if (this->messaging_object ()->consolidate_node (q_data
,
2205 message_block
) == -1)
2207 if (TAO_debug_level
> 0)
2209 TAOLIB_ERROR ((LM_ERROR
,
2210 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
2211 ACE_TEXT ("error consolidating message from input buffer\n"),
2217 // Complete message are to be enqueued and later processed
2218 if (q_data
->missing_data () == 0)
2220 if (this->incoming_message_stack_
.pop (q_data
) == -1)
2225 if (this->consolidate_enqueue_message (q_data
) == -1)
2231 if (message_block
.length () > 0
2232 && this->handle_input_parse_extra_messages (message_block
) == -1)
2237 // In any case try to process the enqueued messages
2238 if (this->process_queue_head (rh
) == -1)
2246 // STACK PROCESSING (critical path)
2249 // Process the first message in buffer on stack
2251 // (PRE: first message resides in aligned memory) Make a node of
2252 // the message-block..
2254 TAO_Queued_Data
qd (&message_block
,
2255 this->orb_core_
->transport_message_buffer_allocator ());
2257 size_t mesg_length
= 0;
2259 if (this->messaging_object ()->parse_next_message (qd
, mesg_length
) == -1
2260 || (qd
.missing_data () == 0
2261 && mesg_length
> message_block
.length ()) )
2263 // extracting message failed
2266 // POST: qd.missing_data_ == 0 --> mesg_length <= message_block.length()
2267 // This prevents seeking rd_ptr behind the wr_ptr
2269 if (qd
.missing_data () != 0 ||
2270 qd
.more_fragments () ||
2271 qd
.msg_type () == GIOP::Fragment
)
2273 if (qd
.missing_data () == 0)
2275 // Dealing with a fragment
2276 TAO_Queued_Data
*nqd
= TAO_Queued_Data::duplicate (qd
);
2283 // mark the end of message in new buffer
2284 char* end_mark
= nqd
->msg_block ()->rd_ptr ()
2286 nqd
->msg_block ()->wr_ptr (end_mark
);
2288 // move the read pointer forward in old buffer
2289 message_block
.rd_ptr (mesg_length
);
2291 // enqueue the message
2292 if (this->consolidate_enqueue_message (nqd
) == -1)
2297 if (message_block
.length () > 0
2298 && this->handle_input_parse_extra_messages (message_block
) == -1)
2303 // In any case try to process the enqueued messages
2304 if (this->process_queue_head (rh
) == -1)
2309 else if (qd
.missing_data () != TAO_MISSING_DATA_UNDEFINED
)
2311 // Incomplete message, must be the last one in buffer
2313 if (qd
.missing_data () != TAO_MISSING_DATA_UNDEFINED
&&
2314 qd
.missing_data () > message_block
.space ())
2316 // Re-Allocate correct size on heap
2317 if (ACE_CDR::grow (qd
.msg_block (),
2318 message_block
.length ()
2319 + qd
.missing_data ()) == -1)
2325 TAO_Queued_Data
*nqd
= TAO_Queued_Data::duplicate (qd
);
2332 // move read-pointer to end of buffer
2333 message_block
.rd_ptr (message_block
.length());
2335 this->incoming_message_stack_
.push (nqd
);
2344 // We cant process the message on stack right now. First we
2345 // have got to parse extra messages from message_block,
2346 // putting them into queue. When this is done we can return
2347 // to process this message, and notifying other threads to
2348 // process the messages in queue.
2349 char * end_marker
= message_block
.rd_ptr ()
2352 if (message_block
.length () > mesg_length
)
2354 // There are more message in data stream to be parsed.
2355 // Safe the rd_ptr to restore later.
2356 char *rd_ptr_stack_mesg
= message_block
.rd_ptr ();
2358 // Skip parsed message, jump to next message in buffer
2359 // PRE: mesg_length <= message_block.length ()
2360 message_block
.rd_ptr (mesg_length
);
2362 // Extract remaining messages and enqueue them for later
2364 if (this->handle_input_parse_extra_messages (message_block
) == -1)
2369 // correct the wr_ptr using the end_marker to point to the
2370 // end of the first message else the code after this will
2371 // see the full stream with all the messages
2372 message_block
.wr_ptr (end_marker
);
2375 message_block
.rd_ptr (rd_ptr_stack_mesg
);
2378 // The following if-else has been copied from
2379 // process_queue_head(). While process_queue_head()
2380 // processes message on heap, here we will process a message
2383 // Now that we have one message on stack to be processed,
2384 // check whether we have one more message in the queue...
2385 if (this->incoming_message_queue_
.queue_length () > 0)
2387 if (TAO_debug_level
> 0)
2389 TAOLIB_DEBUG ((LM_DEBUG
,
2390 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
2391 ACE_TEXT ("notify reactor\n"),
2395 int const retval
= this->notify_reactor ();
2399 // Let the class know that it doesn't need to resume the
2401 rh
.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED
);
2403 else if (retval
< 0)
2408 // As there are no further messages in queue just resume
2409 // the handle. Set the flag incase someone had reset the flag..
2410 rh
.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE
);
2413 // PRE: incoming_message_queue is empty
2414 if (this->process_parsed_messages (&qd
, rh
) == -1)
2418 // move the rd_ptr tp position of end_marker
2419 message_block
.rd_ptr (end_marker
);
2423 // Now that all cases have been processed, there might be kept some data
2424 // in buffer that needs to be safed for next "handle_input" invocations.
2425 if (message_block
.length () > 0)
2427 if (this->partial_message_
== nullptr)
2429 this->allocate_partial_message_block ();
2432 if (this->partial_message_
!= nullptr &&
2433 this->partial_message_
->copy (message_block
.rd_ptr (),
2434 message_block
.length ()) == 0)
2436 message_block
.rd_ptr (message_block
.length ());
2449 TAO_Transport::process_parsed_messages (TAO_Queued_Data
*qd
,
2450 TAO_Resume_Handle
&rh
)
2452 if (TAO_debug_level
> 7)
2454 TAOLIB_DEBUG ((LM_DEBUG
,
2455 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
2456 ACE_TEXT ("entering (missing data == %d)\n"),
2457 this->id(), qd
->missing_data ()));
2460 #if TAO_HAS_TRANSPORT_CURRENT == 1
2461 // Update stats, if any
2462 if (this->stats_
!= nullptr)
2463 this->stats_
->messages_received (qd
->msg_block ()->length ());
2464 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
2466 switch (qd
->msg_type ())
2468 case GIOP::CloseConnection
:
2470 if (TAO_debug_level
> 0)
2472 TAOLIB_DEBUG ((LM_DEBUG
,
2473 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
2474 ACE_TEXT ("received CloseConnection message - %m\n"),
2478 // Return a "-1" so that the next stage can take care of
2479 // closing connection and the necessary memory management.
2484 case GIOP::LocateRequest
:
2486 // Let us resume the handle before we go ahead to process the
2487 // request. This will open up the handle for other threads.
2488 rh
.resume_handle ();
2490 if (this->messaging_object ()->process_request_message (this, qd
) == -1)
2492 // Return a "-1" so that the next stage can take care of
2493 // closing connection and the necessary memory management.
2499 case GIOP::LocateReply
:
2501 rh
.resume_handle ();
2503 TAO_Pluggable_Reply_Params
params (this);
2505 if (this->messaging_object ()->process_reply_message (params
, qd
) == -1)
2507 if (TAO_debug_level
> 0)
2509 TAOLIB_ERROR ((LM_ERROR
,
2510 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
2511 ACE_TEXT ("error in process_reply_message - %m\n"),
2520 case GIOP::CancelRequest
:
2522 // The associated request might be incomplete residing
2523 // fragmented in messaging object. We must make sure the
2524 // resources allocated by fragments are released.
2525 if (this->messaging_object ()->discard_fragmented_message (qd
) == -1)
2527 if (TAO_debug_level
> 0)
2529 TAOLIB_ERROR ((LM_ERROR
,
2530 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
2531 ACE_TEXT ("error processing CancelRequest\n"),
2536 // We are not able to cancel requests being processed already;
2537 // this is declared as optional feature by CORBA, and TAO does
2538 // not support this currently.
2540 // Just continue processing, CancelRequest does not mean to cut
2541 // off the connection.
2544 case GIOP::MessageError
:
2546 if (TAO_debug_level
> 0)
2548 TAOLIB_ERROR ((LM_ERROR
,
2549 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
2550 ACE_TEXT ("received MessageError, closing connection\n"),
2556 case GIOP::Fragment
:
2558 // Nothing to be done.
2563 // If not, just return back..
2568 TAO_Transport::process_queue_head (TAO_Resume_Handle
&rh
)
2570 if (TAO_debug_level
> 3)
2572 TAOLIB_DEBUG ((LM_DEBUG
,
2573 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, %d enqueued\n"),
2574 this->id (), this->incoming_message_queue_
.queue_length () ));
2577 // See if message in queue ...
2578 if (this->incoming_message_queue_
.queue_length () > 0)
2580 // Get the message on the head of the queue..
2581 TAO_Queued_Data
*qd
=
2582 this->incoming_message_queue_
.dequeue_head ();
2584 if (TAO_debug_level
> 3)
2586 TAOLIB_DEBUG ((LM_DEBUG
,
2587 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ")
2588 ACE_TEXT ("the size of the queue is [%d]\n"),
2590 this->incoming_message_queue_
.queue_length()));
2592 // Now that we have pulled out out one message out of the queue,
2593 // check whether we have one more message in the queue...
2594 if (this->incoming_message_queue_
.queue_length () > 0)
2596 if (TAO_debug_level
> 0)
2598 TAOLIB_DEBUG ((LM_DEBUG
,
2599 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ")
2600 ACE_TEXT ("notify reactor\n"),
2604 int const retval
= this->notify_reactor ();
2608 // Let the class know that it doesn't need to resume the
2610 rh
.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED
);
2612 else if (retval
< 0)
2617 // As we are ready to process the last message just resume
2618 // the handle. Set the flag incase someone had reset the flag..
2619 rh
.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE
);
2622 // Process the message...
2623 int const retval
= this->process_parsed_messages (qd
, rh
);
2625 // Delete the Queued_Data..
2626 TAO_Queued_Data::release (qd
);
2635 TAO_Transport::notify_reactor_now ()
2637 ACE_Event_Handler
*eh
= this->event_handler_i ();
2639 // Get the reactor associated with the event handler
2640 ACE_Reactor
*reactor
= this->orb_core ()->reactor ();
2642 if (TAO_debug_level
> 0)
2644 TAOLIB_DEBUG ((LM_DEBUG
,
2645 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ")
2646 ACE_TEXT ("notify to Reactor\n"),
2650 // Send a notification to the reactor...
2651 int const retval
= reactor
->notify (eh
, ACE_Event_Handler::READ_MASK
);
2653 if (retval
< 0 && TAO_debug_level
> 2)
2655 // @todo: need to think about what is the action that
2656 // we can take when we get here.
2657 TAOLIB_ERROR ((LM_ERROR
,
2658 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ")
2659 ACE_TEXT ("notify to the reactor failed..\n"),
2666 TAO::Transport_Cache_Manager
&
2667 TAO_Transport::transport_cache_manager ()
2669 return this->orb_core_
->lane_resources ().transport_cache ();
2673 TAO_Transport::assign_translators (TAO_InputCDR
*inp
, TAO_OutputCDR
*outp
)
2675 if (this->char_translator_
)
2677 this->char_translator_
->assign (inp
);
2678 this->char_translator_
->assign (outp
);
2680 if (this->wchar_translator_
)
2682 this->wchar_translator_
->assign (inp
);
2683 this->wchar_translator_
->assign (outp
);
2688 TAO_Transport::clear_translators (TAO_InputCDR
*inp
, TAO_OutputCDR
*outp
)
2692 inp
->char_translator (nullptr);
2693 inp
->wchar_translator (nullptr);
2697 outp
->char_translator (nullptr);
2698 outp
->wchar_translator (nullptr);
2702 ACE_Event_Handler::Reference_Count
2703 TAO_Transport::add_reference ()
2705 return this->event_handler_i ()->add_reference ();
2708 ACE_Event_Handler::Reference_Count
2709 TAO_Transport::remove_reference ()
2711 return this->event_handler_i ()->remove_reference ();
2715 TAO_Transport::out_stream ()
2717 return this->messaging_object ()->out_stream ();
2721 TAO_Transport::output_cdr_lock ()
2723 return this->output_cdr_mutex_
;
2727 TAO_Transport::messaging_init (TAO_GIOP_Message_Version
const &version
)
2729 this->messaging_object ()->init (version
.major
, version
.minor
);
2733 TAO_Transport::pre_close ()
2735 if (TAO_debug_level
> 9)
2737 TAOLIB_DEBUG ((LM_DEBUG
, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::pre_close\n"),
2740 // @TODO: something needs to be done with is_connected_. Checking it is
2741 // guarded by a mutex, but setting it is not. Until the need for mutexed
2742 // protection is required, the transport cache is holding its own copy
2743 // of the is_connected_ flag, so that during cache lookups the cache
2744 // manager doesn't need to be burdened by the lock in is_connected().
2745 this->is_connected_
= false;
2746 this->transport_cache_manager ().mark_connected (this->cache_map_entry_
,
2748 this->purge_entry ();
2750 ACE_MT (ACE_GUARD (ACE_Lock
, guard
, *this->handler_lock_
));
2751 this->cleanup_queue_i ();
2756 TAO_Transport::post_open (size_t id
)
2758 if (TAO_debug_level
> 9)
2760 TAOLIB_DEBUG ((LM_DEBUG
, ACE_TEXT ("TAO (%P|%t) - Transport::post_open, ")
2761 ACE_TEXT ("transport id changed from [%d] to [%d]\n"), this->id_
, id
));
2765 // When we have data in our outgoing queue schedule ourselves
2767 if (!this->queue_is_empty_i ())
2769 // If the wait strategy wants us to be registered with the reactor
2770 // then we do so. If registration is required and it succeeds,
2771 // #REFCOUNT# becomes two.
2772 if (this->wait_strategy ()->register_handler () == 0)
2774 if (this->flush_in_post_open_
)
2776 TAO_Flushing_Strategy
*flushing_strategy
=
2777 this->orb_core ()->flushing_strategy ();
2779 if (flushing_strategy
== nullptr)
2780 throw CORBA::INTERNAL ();
2782 this->flush_in_post_open_
= false;
2783 (void)flushing_strategy
->schedule_output (this);
2788 // Registration failures.
2790 // Purge from the connection cache, if we are not in the cache, this
2791 // just does nothing.
2792 (void) this->purge_entry ();
2794 // Close the handler.
2795 (void) this->close_connection ();
2797 if (TAO_debug_level
> 0)
2799 TAOLIB_ERROR ((LM_ERROR
,
2800 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::post_open , ")
2801 ACE_TEXT ("could not register the transport ")
2802 ACE_TEXT ("in the reactor.\n"),
2811 ACE_GUARD_RETURN (ACE_Lock
, ace_mon
, *this->handler_lock_
, false);
2812 this->is_connected_
= true;
2815 if (TAO_debug_level
> 9)
2817 TAOLIB_DEBUG ((LM_DEBUG
, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::post_open")
2818 ACE_TEXT (", cache_map_entry_ is [%@]\n"), this->id_
, this->cache_map_entry_
));
2821 this->transport_cache_manager ().mark_connected (this->cache_map_entry_
,
2824 // update transport cache to make this entry available
2825 this->transport_cache_manager ().set_entry_state (
2826 this->cache_map_entry_
,
2827 TAO::ENTRY_IDLE_AND_PURGABLE
);
2833 TAO_Transport::allocate_partial_message_block ()
2835 if (this->partial_message_
== nullptr)
2837 // This value must be at least large enough to hold a GIOP message
2838 // header plus a GIOP fragment header
2839 size_t const partial_message_size
=
2840 this->messaging_object ()->header_length ();
2841 // + this->messaging_object ()->fragment_header_length ();
2842 // deprecated, conflicts with not-single_read_opt.
2844 ACE_NEW (this->partial_message_
,
2845 ACE_Message_Block (partial_message_size
));
2850 TAO_Transport::set_bidir_context_info (TAO_Operation_Details
&)
2854 ACE_Time_Value
const *
2855 TAO_Transport::io_timeout(
2856 TAO::Transport::Drain_Constraints
const & dc
) const
2858 if (dc
.block_on_io())
2860 return dc
.timeout();
2862 if (this->wait_strategy()->can_process_upcalls())
2866 return dc
.timeout();
2870 TAO_Transport::using_blocking_io_for_synch_messages () const
2872 if (this->wait_strategy()->can_process_upcalls())
2880 TAO_Transport::using_blocking_io_for_asynch_messages () const
2886 TAO_Transport::connection_closed_on_read () const
2888 return connection_closed_on_read_
;
2891 TAO_END_VERSIONED_NAMESPACE_DECL