Revert "Minor modernization of DynamicAny code"
[ACE_TAO.git] / TAO / tao / Transport.cpp
bloba63904aef9fe92d70d103ac9e964f32b95685a3f
1 #include "tao/Transport.h"
3 #include "tao/LF_Follower.h"
4 #include "tao/Leader_Follower.h"
5 #include "tao/Client_Strategy_Factory.h"
6 #include "tao/Wait_Strategy.h"
7 #include "tao/Transport_Mux_Strategy.h"
8 #include "tao/Stub.h"
9 #include "tao/Transport_Queueing_Strategies.h"
10 #include "tao/Connection_Handler.h"
11 #include "tao/GIOP_Message_Base.h"
12 #include "tao/Synch_Queued_Message.h"
13 #include "tao/Asynch_Queued_Message.h"
14 #include "tao/Flushing_Strategy.h"
15 #include "tao/Thread_Lane_Resources.h"
16 #include "tao/Resume_Handle.h"
17 #include "tao/Codeset_Manager.h"
18 #include "tao/Codeset_Translator_Base.h"
19 #include "tao/debug.h"
20 #include "tao/CDR.h"
21 #include "tao/ORB_Core.h"
22 #include "tao/MMAP_Allocator.h"
23 #include "tao/SystemException.h"
24 #include "tao/operation_details.h"
25 #include "tao/Transport_Descriptor_Interface.h"
26 #include "tao/ORB_Time_Policy.h"
28 #include "ace/OS_NS_sys_time.h"
29 #include "ace/OS_NS_stdio.h"
30 #include "ace/Reactor.h"
31 #include "ace/os_include/sys/os_uio.h"
32 #include "ace/High_Res_Timer.h"
33 #include "ace/CORBA_macros.h"
34 #include "ace/Truncate.h"
36 #if !defined (__ACE_INLINE__)
37 # include "tao/Transport.inl"
38 #endif /* __ACE_INLINE__ */
41 * Static function in file scope
43 static void
44 dump_iov (iovec *iov, int iovcnt, size_t id,
45 size_t current_transfer,
46 const ACE_TCHAR *location)
48 ACE_GUARD (ACE_Log_Msg, ace_mon, *ACE_Log_Msg::instance ());
50 TAOLIB_DEBUG ((LM_DEBUG,
51 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::%s, ")
52 ACE_TEXT ("sending %d buffers\n"),
53 id, location, iovcnt));
55 for (int i = 0; i != iovcnt && 0 < current_transfer; ++i)
57 size_t iov_len = iov[i].iov_len;
59 // Possibly a partially sent iovec entry.
60 if (current_transfer < iov_len)
62 iov_len = current_transfer;
65 TAOLIB_DEBUG ((LM_DEBUG,
66 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::%s, ")
67 ACE_TEXT ("buffer %d/%d has %d bytes\n"),
68 id, location,
69 i, iovcnt,
70 iov_len));
72 size_t len;
74 for (size_t offset = 0; offset < iov_len; offset += len)
76 ACE_TCHAR header[1024];
77 ACE_OS::sprintf (header,
78 ACE_TEXT("TAO - ")
79 ACE_TEXT("Transport[")
80 ACE_SIZE_T_FORMAT_SPECIFIER
81 ACE_TEXT("]::%s")
82 ACE_TEXT(" (")
83 ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT("/")
84 ACE_SIZE_T_FORMAT_SPECIFIER ACE_TEXT(")"),
85 id, location, offset, iov_len);
87 len = iov_len - offset;
89 if (len > 512)
91 len = 512;
94 TAOLIB_HEX_DUMP ((LM_DEBUG,
95 static_cast<char*> (iov[i].iov_base) + offset,
96 len,
97 header));
99 current_transfer -= iov_len;
102 TAOLIB_DEBUG ((LM_DEBUG,
103 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::%s, ")
104 ACE_TEXT ("end of data\n"),
105 id, location));
108 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
110 #if TAO_HAS_TRANSPORT_CURRENT == 1
111 TAO::Transport::Stats::~Stats ()
114 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
116 TAO_Transport::TAO_Transport (CORBA::ULong tag,
117 TAO_ORB_Core *orb_core,
118 size_t input_cdr_size)
119 : tag_ (tag)
120 , orb_core_ (orb_core)
121 , cache_map_entry_ (nullptr)
122 , tms_ (nullptr)
123 , ws_ (nullptr)
124 , bidirectional_flag_ (-1)
125 , opening_connection_role_ (TAO::TAO_UNSPECIFIED_ROLE)
126 , head_ (nullptr)
127 , tail_ (nullptr)
128 , incoming_message_queue_ (orb_core)
129 , current_deadline_ (ACE_Time_Value::zero)
130 , flush_timer_id_ (-1)
131 , transport_timer_ (this)
132 , handler_lock_ (orb_core->resource_factory ()->create_cached_connection_lock ())
133 , id_ ((size_t) this)
134 , purging_order_ (0)
135 , recv_buffer_size_ (0)
136 , sent_byte_count_ (0)
137 , is_connected_ (false)
138 , connection_closed_on_read_ (false)
139 , messaging_object_ (nullptr)
140 , char_translator_ (nullptr)
141 , wchar_translator_ (nullptr)
142 , tcs_set_ (0)
143 , first_request_ (true)
144 , partial_message_ (nullptr)
145 #if TAO_HAS_SENDFILE == 1
146 // The ORB has been configured to use the MMAP allocator, meaning
147 // we could/should use sendfile() to send data. Cast once rather
148 // here rather than during each send. This assumes that all
149 // TAO_OutputCDR instances are using the same TAO_MMAP_Allocator
150 // instance as the underlying output CDR buffer allocator.
151 , mmap_allocator_ (
152 dynamic_cast<TAO_MMAP_Allocator *> (
153 orb_core->output_cdr_buffer_allocator ()))
154 #endif /* TAO_HAS_SENDFILE==1 */
155 #if TAO_HAS_TRANSPORT_CURRENT == 1
156 , stats_ (nullptr)
157 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
158 , flush_in_post_open_ (false)
160 ACE_NEW (this->messaging_object_,
161 TAO_GIOP_Message_Base (orb_core,
162 this,
163 input_cdr_size));
165 TAO_Client_Strategy_Factory *cf =
166 this->orb_core_->client_factory ();
168 // Create WS now.
169 this->ws_ = cf->create_wait_strategy (this);
171 // Create TMS now.
172 this->tms_ = cf->create_transport_mux_strategy (this);
174 #if TAO_HAS_TRANSPORT_CURRENT == 1
175 // Allocate stats
176 ACE_NEW_THROW_EX (this->stats_,
177 TAO::Transport::Stats,
178 CORBA::NO_MEMORY ());
179 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
182 TAO_Transport::~TAO_Transport ()
184 if (TAO_debug_level > 9)
186 TAOLIB_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::~Transport\n"),
187 this->id_));
190 delete this->messaging_object_;
192 delete this->ws_;
194 delete this->tms_;
196 delete this->handler_lock_;
198 if (!this->is_connected_)
200 // When we have a not connected transport we could have buffered
201 // messages on this transport which we have to cleanup now.
202 this->cleanup_queue_i();
205 // Release the partial message block, however we may
206 // have never allocated one.
207 ACE_Message_Block::release (this->partial_message_);
209 // By the time the destructor is reached here all the connection stuff
210 // *must* have been cleaned up.
212 // The following assert is needed for the test "Bug_2494_Regression".
213 // See the bugzilla bug #2494 for details.
214 ACE_ASSERT (this->queue_is_empty_i ());
215 ACE_ASSERT (this->cache_map_entry_ == nullptr);
217 #if TAO_HAS_TRANSPORT_CURRENT == 1
218 delete this->stats_;
219 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
222 void
223 TAO_Transport::provide_handler (TAO::Connection_Handler_Set &handlers)
225 (void) this->add_reference ();
227 handlers.insert (this->connection_handler_i ());
230 bool
231 TAO_Transport::provide_blockable_handler (TAO::Connection_Handler_Set &h)
233 if (this->ws_->non_blocking () ||
234 this->opening_connection_role_ == TAO::TAO_SERVER_ROLE)
235 return false;
237 (void) this->add_reference ();
239 h.insert (this->connection_handler_i ());
241 return true;
244 bool
245 TAO_Transport::idle_after_send ()
247 return this->tms ()->idle_after_send ();
250 bool
251 TAO_Transport::idle_after_reply ()
253 return this->tms ()->idle_after_reply ();
257 TAO_Transport::tear_listen_point_list (TAO_InputCDR &)
259 ACE_NOTSUP_RETURN (-1);
263 TAO_Transport::send_message_shared (TAO_Stub *stub,
264 TAO_Message_Semantics message_semantics,
265 const ACE_Message_Block *message_block,
266 ACE_Time_Value *max_wait_time)
268 int result = 0;
271 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
273 result =
274 this->send_message_shared_i (stub, message_semantics,
275 message_block, max_wait_time);
278 if (result == -1)
280 // The connection needs to be closed here.
281 // In the case of a partially written message this is the only way to cleanup
282 // the physical connection as well as the Transport. An EOF on the remote end
283 // will cancel the partially received message.
284 this->close_connection ();
287 return result;
290 bool
291 TAO_Transport::post_connect_hook ()
293 return true;
296 bool
297 TAO_Transport::register_if_necessary ()
299 if (this->is_connected_ &&
300 this->wait_strategy ()->register_handler () == -1)
302 // Registration failures.
303 if (TAO_debug_level > 0)
305 TAOLIB_ERROR ((LM_ERROR,
306 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::register_if_necessary, ")
307 ACE_TEXT ("could not register the transport ")
308 ACE_TEXT ("in the reactor.\n"),
309 this->id ()));
312 // Purge from the connection cache, if we are not in the cache, this
313 // just does nothing.
314 (void) this->purge_entry ();
316 // Close the handler.
317 (void) this->close_connection ();
319 return false;
321 return true;
324 void
325 TAO_Transport::close_connection ()
327 this->connection_handler_i ()->close_connection ();
331 TAO_Transport::register_handler ()
333 if (TAO_debug_level > 4)
335 TAOLIB_DEBUG ((LM_DEBUG,
336 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::register_handler\n"),
337 this->id ()));
340 ACE_Reactor * const r = this->orb_core_->reactor ();
342 // @@note: This should be okay since the register handler call will
343 // not make a nested call into the transport.
344 ACE_GUARD_RETURN (ACE_Lock,
345 ace_mon,
346 *this->handler_lock_,
347 false);
349 if (r == this->event_handler_i ()->reactor () &&
350 (this->wait_strategy ()->non_blocking () ||
351 !this->orb_core ()->client_factory ()->use_cleanup_options ()))
353 if (TAO_debug_level > 6)
355 TAOLIB_DEBUG ((LM_DEBUG,
356 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::register_handler - ")
357 ACE_TEXT ("already registered with reactor\n"),
358 this->id ()));
361 return 0;
364 if (TAO_debug_level > 6)
366 TAOLIB_DEBUG ((LM_DEBUG,
367 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::register_handler - ")
368 ACE_TEXT ("registering event handler with reactor\n"),
369 this->id ()));
372 // Set the flag in the Connection Handler and in the Wait Strategy
373 // @@Maybe we should set these flags after registering with the
374 // reactor. What if the registration fails???
375 this->ws_->is_registered (true);
377 // Register the handler with the reactor
378 return r->register_handler (this->event_handler_i (),
379 ACE_Event_Handler::READ_MASK);
383 TAO_Transport::remove_handler ()
385 if (TAO_debug_level > 4)
387 TAOLIB_DEBUG ((LM_DEBUG,
388 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::remove_handler\n"),
389 this->id ()));
392 ACE_Reactor * const r = this->orb_core_->reactor ();
394 // @@note: This should be okay since the remove handler call will
395 // not make a nested call into the transport.
396 ACE_GUARD_RETURN (ACE_Lock,
397 ace_mon,
398 *this->handler_lock_,
399 false);
402 if (this->event_handler_i ()->reactor () == nullptr)
404 return 0;
407 if (TAO_debug_level > 6)
409 TAOLIB_DEBUG ((LM_DEBUG,
410 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::remove_handler - ")
411 ACE_TEXT ("removing event handler from reactor\n"),
412 this->id ()));
415 // Set the flag in the Wait Strategy
416 this->ws_->is_registered (false);
418 // Remove the handler from the reactor
419 if (r->remove_handler (this->event_handler_i (),
420 ACE_Event_Handler::READ_MASK|
421 ACE_Event_Handler::DONT_CALL) == -1)
423 if (TAO_debug_level > 0)
424 TAOLIB_ERROR ((LM_ERROR,
425 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::remove_handler - ")
426 ACE_TEXT ("reactor->remove_handler failed\n"),
427 this->id ()));
428 return -1;
430 else
432 // reset the reactor property of the event handler or
433 // Transport::register_handler() will not re-register
434 // when called after us again.
435 this->event_handler_i ()->reactor (nullptr);
436 return 0;
440 #if TAO_HAS_SENDFILE == 1
441 ssize_t
442 TAO_Transport::sendfile (TAO_MMAP_Allocator * /* allocator */,
443 iovec * iov,
444 int iovcnt,
445 size_t &bytes_transferred,
446 TAO::Transport::Drain_Constraints const & dc)
448 // Concrete pluggable transport doesn't implement sendfile().
449 // Fallback on TAO_Transport::send().
451 // @@ We can probably refactor the TAO_IIOP_Transport::sendfile()
452 // implementation to this base class method, and leave any TCP
453 // specific configuration out of this base class method.
454 // -Ossama
455 return this->send (iov, iovcnt, bytes_transferred,
456 this->io_timeout (dc));
458 #endif /* TAO_HAS_SENDFILE==1 */
461 TAO_Transport::generate_locate_request (
462 TAO_Target_Specification &spec,
463 TAO_Operation_Details &opdetails,
464 TAO_OutputCDR &output)
466 if (this->messaging_object ()->generate_locate_request_header (opdetails,
467 spec,
468 output) == -1)
470 if (TAO_debug_level > 0)
472 TAOLIB_ERROR ((LM_ERROR,
473 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::generate_locate_request, ")
474 ACE_TEXT ("error while marshalling the LocateRequest header\n"),
475 this->id ()));
478 return -1;
481 return 0;
485 TAO_Transport::generate_request_header (
486 TAO_Operation_Details &opdetails,
487 TAO_Target_Specification &spec,
488 TAO_OutputCDR &output)
490 if (this->messaging_object ()->generate_request_header (opdetails,
491 spec,
492 output) == -1)
494 if (TAO_debug_level > 0)
496 TAOLIB_ERROR ((LM_ERROR,
497 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::generate_request_header, ")
498 ACE_TEXT ("error while marshalling the Request header\n"),
499 this->id()));
502 return -1;
505 return 0;
508 /// @todo Ideally the following should be inline.
509 /// @todo purge_entry has a return value, use it
511 TAO_Transport::recache_transport (TAO_Transport_Descriptor_Interface *desc)
513 // First purge our entry
514 this->purge_entry ();
516 // Then add ourselves to the cache
517 return this->transport_cache_manager ().cache_transport (desc, this);
521 TAO_Transport::purge_entry ()
523 if (TAO_debug_level > 3)
525 TAOLIB_DEBUG ((LM_DEBUG,
526 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::purge_entry, ")
527 ACE_TEXT ("entry is %@\n"),
528 this->id (), this->cache_map_entry_));
531 return this->transport_cache_manager ().purge_entry (this->cache_map_entry_);
534 bool
535 TAO_Transport::can_be_purged ()
537 return !this->tms_->has_request ();
541 TAO_Transport::make_idle ()
543 if (TAO_debug_level > 3)
545 TAOLIB_DEBUG ((LM_DEBUG,
546 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::make_idle\n"),
547 this->id ()));
550 return this->transport_cache_manager ().make_idle (this->cache_map_entry_);
554 TAO_Transport::update_transport ()
556 return this->transport_cache_manager ().update_entry (this->cache_map_entry_);
560 * Methods called and used in the output path of the ORB.
562 TAO_Transport::Drain_Result
563 TAO_Transport::handle_output (TAO::Transport::Drain_Constraints const & dc)
565 if (TAO_debug_level > 3)
567 TAOLIB_DEBUG ((LM_DEBUG,
568 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output")
569 ACE_TEXT (" - block_on_io=%d, timeout=%d.%06d\n"),
570 this->id (),
571 dc.block_on_io(),
572 dc.timeout() ? dc.timeout()->sec() : static_cast<time_t> (-1),
573 dc.timeout() ? dc.timeout()->usec() : -1 ));
576 // The flushing strategy (potentially via the Reactor) wants to send
577 // more data, first check if there is a current message that needs
578 // more sending...
579 Drain_Result const retval = this->drain_queue (dc);
581 if (TAO_debug_level > 3)
583 TAOLIB_DEBUG ((LM_DEBUG,
584 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_output, ")
585 ACE_TEXT ("drain_queue returns %d/%d\n"),
586 this->id (),
587 static_cast<int> (retval.dre_), ACE_ERRNO_GET));
590 // Any errors are returned directly to the Reactor
591 return retval;
595 TAO_Transport::format_queue_message (TAO_OutputCDR &stream,
596 ACE_Time_Value *max_wait_time,
597 TAO_Stub* stub)
599 if (this->messaging_object ()->format_message (stream, stub, nullptr) != 0)
600 return -1;
602 if (this->queue_message_i (stream.begin (), max_wait_time) != 0)
603 return -1;
605 // check the buffering constraints to see what must be done in post_open()
606 bool must_flush = false;
607 this->flush_in_post_open_ |=
608 this->check_buffering_constraints_i (stub,
609 must_flush);
611 return 0;
615 TAO_Transport::send_message_block_chain (const ACE_Message_Block *mb,
616 size_t &bytes_transferred,
617 ACE_Time_Value *max_wait_time)
619 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, -1);
621 TAO::Transport::Drain_Constraints dc(
622 max_wait_time, true);
624 return this->send_message_block_chain_i (mb,
625 bytes_transferred,
626 dc);
630 TAO_Transport::send_message_block_chain_i (const ACE_Message_Block *mb,
631 size_t &bytes_transferred,
632 TAO::Transport::Drain_Constraints const & dc)
634 size_t const total_length = mb->total_length ();
636 // We are going to block, so there is no need to clone
637 // the message block.
638 TAO_Synch_Queued_Message synch_message (mb, this->orb_core_);
640 synch_message.push_back (this->head_, this->tail_);
642 Drain_Result const n = this->drain_queue_i (dc);
644 if (n == DR_ERROR)
646 synch_message.remove_from_list (this->head_, this->tail_);
647 return -1; // Error while sending...
649 else if (n == DR_QUEUE_EMPTY)
651 bytes_transferred = total_length;
652 return 1; // Empty queue, message was sent..
655 // Remove the temporary message from the queue...
656 synch_message.remove_from_list (this->head_, this->tail_);
658 bytes_transferred = total_length - synch_message.message_length ();
660 return 0;
664 TAO_Transport::send_synchronous_message_i (const ACE_Message_Block *mb,
665 ACE_Time_Value *max_wait_time)
667 // We are going to block, so there is no need to clone
668 // the message block.
669 size_t const total_length = mb->total_length ();
670 TAO_Synch_Queued_Message synch_message (mb, this->orb_core_);
672 synch_message.push_back (this->head_, this->tail_);
674 int const result = this->send_synch_message_helper_i (synch_message,
675 max_wait_time);
676 if (result == -1 && errno == ETIME)
678 if (total_length == synch_message.message_length ()) //none was sent
680 if (TAO_debug_level > 2)
682 TAOLIB_DEBUG ((LM_DEBUG,
683 ACE_TEXT ("TAO (%P|%t) - ")
684 ACE_TEXT ("Transport[%d]::send_synchronous_message_i, ")
685 ACE_TEXT ("timeout encountered before any bytes sent\n"),
686 this->id ()));
688 throw ::CORBA::TIMEOUT (
689 CORBA::SystemException::_tao_minor_code (
690 TAO_TIMEOUT_SEND_MINOR_CODE,
691 ETIME),
692 CORBA::COMPLETED_NO);
694 else
696 return -1;
699 else if(result == -1 || result == 1)
701 return result;
704 TAO_Flushing_Strategy *flushing_strategy =
705 this->orb_core ()->flushing_strategy ();
706 if (flushing_strategy->schedule_output (this) == -1)
708 synch_message.remove_from_list (this->head_, this->tail_);
709 if (TAO_debug_level > 0)
711 TAOLIB_ERROR ((LM_ERROR,
712 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
713 ACE_TEXT ("send_synchronous_message_i, ")
714 ACE_TEXT ("error while scheduling flush - %m\n"),
715 this->id ()));
717 return -1;
720 // No need to check for result == TAO_Flushing_Strategy::MUST_FLUSH,
721 // because we're always going to flush anyway.
723 // Release the mutex, other threads may modify the queue as we
724 // block for a long time writing out data.
725 int flush_result;
727 typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
728 TAO_REVERSE_LOCK reverse (*this->handler_lock_);
729 ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
731 flush_result = flushing_strategy->flush_message (this,
732 &synch_message,
733 max_wait_time);
736 if (flush_result == -1)
738 synch_message.remove_from_list (this->head_, this->tail_);
740 // We don't need to do anything special for the timeout case.
741 // The connection is going to get closed and the Transport destroyed.
742 // The only thing to do maybe is to empty the queue.
744 if (TAO_debug_level > 0)
746 TAOLIB_ERROR ((LM_ERROR,
747 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_synchronous_message_i, ")
748 ACE_TEXT ("error while sending message - %m\n"),
749 this->id ()));
752 return -1;
755 return 1;
759 TAO_Transport::send_reply_message_i (const ACE_Message_Block *mb,
760 ACE_Time_Value *max_wait_time)
762 // Don't clone now.. We could be sent in one shot!
763 TAO_Synch_Queued_Message synch_message (mb, this->orb_core_);
765 synch_message.push_back (this->head_, this->tail_);
767 int const n =
768 this->send_synch_message_helper_i (synch_message, max_wait_time);
770 // What about partially sent messages.
771 if (n == -1 || n == 1)
773 return n;
776 if (TAO_debug_level > 3)
778 TAOLIB_DEBUG ((LM_DEBUG,
779 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_reply_message_i, ")
780 ACE_TEXT ("preparing to add to queue before leaving\n"),
781 this->id ()));
784 // Till this point we shouldn't have any copying and that is the
785 // point anyway. Now, remove the node from the list
786 synch_message.remove_from_list (this->head_, this->tail_);
788 // Clone the node that we have.
789 TAO_Queued_Message *msg =
790 synch_message.clone (this->orb_core_->transport_message_buffer_allocator ());
792 // Stick it in the queue
793 msg->push_back (this->head_, this->tail_);
795 TAO_Flushing_Strategy *flushing_strategy =
796 this->orb_core ()->flushing_strategy ();
798 int const result = flushing_strategy->schedule_output (this);
800 if (result == -1)
802 if (TAO_debug_level > 5)
804 TAOLIB_DEBUG ((LM_DEBUG, "TAO (%P|%t) - Transport[%d]::send_reply_"
805 "message_i, dequeuing msg due to schedule_output "
806 "failure\n", this->id ()));
808 msg->remove_from_list (this->head_, this->tail_);
809 msg->destroy ();
811 else if (result == TAO_Flushing_Strategy::MUST_FLUSH)
813 typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
814 TAO_REVERSE_LOCK reverse (*this->handler_lock_);
815 ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
816 (void) flushing_strategy->flush_transport (this, nullptr);
819 return 1;
823 TAO_Transport::send_synch_message_helper_i (TAO_Synch_Queued_Message &synch_message,
824 ACE_Time_Value * max_wait_time)
826 TAO::Transport::Drain_Constraints dc(
827 max_wait_time, this->using_blocking_io_for_synch_messages());
829 Drain_Result const n = this->drain_queue_i (dc);
831 if (n == DR_ERROR)
833 synch_message.remove_from_list (this->head_, this->tail_);
834 return -1; // Error while sending...
836 else if (n == DR_QUEUE_EMPTY)
838 return 1; // Empty queue, message was sent..
841 if (synch_message.all_data_sent ())
843 return 1;
846 return 0;
850 TAO_Transport::schedule_output_i ()
852 ACE_Event_Handler * const eh = this->event_handler_i ();
853 ACE_Reactor * const reactor = eh->reactor ();
855 if (reactor == nullptr)
857 if (TAO_debug_level > 1)
859 TAOLIB_ERROR ((LM_ERROR,
860 ACE_TEXT ("TAO (%P|%t) - ")
861 ACE_TEXT ("Transport[%d]::schedule_output_i, ")
862 ACE_TEXT ("no reactor,")
863 ACE_TEXT ("returning -1\n"),
864 this->id ()));
866 return -1;
869 // Check to see if our event handler is still registered with the
870 // reactor. It's possible for another thread to have run close_connection()
871 // since we last used the event handler.
872 ACE_Event_Handler * const found = reactor->find_handler (eh->get_handle ());
873 if (found)
875 found->remove_reference ();
877 if (found != eh)
879 if (TAO_debug_level > 3)
881 TAOLIB_ERROR ((LM_ERROR,
882 ACE_TEXT ("TAO (%P|%t) - ")
883 ACE_TEXT ("Transport[%d]::schedule_output_i ")
884 ACE_TEXT ("event handler not found in reactor,")
885 ACE_TEXT ("returning -1\n"),
886 this->id ()));
889 return -1;
893 if (TAO_debug_level > 3)
895 TAOLIB_DEBUG ((LM_DEBUG,
896 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::schedule_output_i\n"),
897 this->id ()));
900 return reactor->schedule_wakeup (eh, ACE_Event_Handler::WRITE_MASK);
904 TAO_Transport::cancel_output_i ()
906 ACE_Event_Handler * const eh = this->event_handler_i ();
907 ACE_Reactor *const reactor = eh->reactor ();
909 if (TAO_debug_level > 3)
911 TAOLIB_DEBUG ((LM_DEBUG,
912 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cancel_output_i\n"),
913 this->id ()));
916 return reactor->cancel_wakeup (eh, ACE_Event_Handler::WRITE_MASK);
920 TAO_Transport::handle_timeout (const ACE_Time_Value & /* current_time */,
921 const void *act)
923 if (TAO_debug_level > 6)
925 TAOLIB_DEBUG ((LM_DEBUG,
926 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_timeout, ")
927 ACE_TEXT ("timer expired\n"),
928 this->id ()));
931 /// This is the only legal ACT in the current configuration....
932 if (act != &this->current_deadline_)
934 return -1;
937 if (this->flush_timer_pending ())
939 // The timer is always a oneshot timer, so mark is as not
940 // pending.
941 this->reset_flush_timer ();
943 TAO_Flushing_Strategy *flushing_strategy =
944 this->orb_core ()->flushing_strategy ();
945 int const result = flushing_strategy->schedule_output (this);
946 if (result == TAO_Flushing_Strategy::MUST_FLUSH)
948 typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
949 TAO_REVERSE_LOCK reverse (*this->handler_lock_);
950 ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
951 if (flushing_strategy->flush_transport (this, nullptr) == -1) {
952 return -1;
957 return 0;
960 TAO_Transport::Drain_Result
961 TAO_Transport::drain_queue (TAO::Transport::Drain_Constraints const & dc)
963 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, DR_ERROR);
964 Drain_Result const retval = this->drain_queue_i (dc);
966 if (retval == DR_QUEUE_EMPTY)
968 // ... there is no current message or it was completely
969 // sent, cancel output...
970 TAO_Flushing_Strategy *flushing_strategy =
971 this->orb_core ()->flushing_strategy ();
973 flushing_strategy->cancel_output (this);
975 return DR_OK;
978 return retval;
981 TAO_Transport::Drain_Result
982 TAO_Transport::drain_queue_helper (int &iovcnt, iovec iov[],
983 TAO::Transport::Drain_Constraints const & dc)
985 // As a side-effect, this decrements the timeout() pointed-to value by
986 // the time used in this function. That might be important as there are
987 // potentially long running system calls invoked from here.
988 TAO::ORB_Countdown_Time countdown(dc.timeout());
990 size_t byte_count = 0;
992 // ... send the message ...
993 ssize_t retval = -1;
995 #if TAO_HAS_SENDFILE == 1
996 if (this->mmap_allocator_)
997 retval = this->sendfile (this->mmap_allocator_,
998 iov,
999 iovcnt,
1000 byte_count,
1001 dc);
1002 else
1003 #endif /* TAO_HAS_SENDFILE==1 */
1004 retval = this->send (iov, iovcnt, byte_count,
1005 this->io_timeout (dc));
1007 if (TAO_debug_level > 9)
1009 dump_iov (iov, iovcnt, this->id (),
1010 byte_count, ACE_TEXT("drain_queue_helper"));
1013 if (retval == 0)
1015 if (TAO_debug_level > 4)
1017 TAOLIB_DEBUG ((LM_DEBUG,
1018 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
1019 ACE_TEXT ("send() returns 0\n"),
1020 this->id ()));
1022 return DR_ERROR;
1024 else if (retval == -1)
1026 if (TAO_debug_level > 4)
1028 TAOLIB_DEBUG ((LM_DEBUG,
1029 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
1030 ACE_TEXT ("error during send() (errno: %d) - %m\n"),
1031 this->id (), ACE_ERRNO_GET));
1034 if (errno == EWOULDBLOCK || errno == EAGAIN)
1036 return DR_WOULDBLOCK;
1039 return DR_ERROR;
1042 // ... now we need to update the queue, removing elements
1043 // that have been sent, and updating the last element if it
1044 // was only partially sent ...
1045 this->cleanup_queue (byte_count);
1046 iovcnt = 0;
1048 // ... start over, how do we guarantee progress? Because if
1049 // no bytes are sent send() can only return 0 or -1
1051 // Total no. of bytes sent for a send call
1052 this->sent_byte_count_ += byte_count;
1054 if (TAO_debug_level > 4)
1056 TAOLIB_DEBUG ((LM_DEBUG,
1057 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_helper, ")
1058 ACE_TEXT ("byte_count = %d, head_is_empty = %d\n"),
1059 this->id(), byte_count, this->queue_is_empty_i ()));
1062 return DR_QUEUE_EMPTY;
1063 // drain_queue_i will check if the queue is actually empty
1066 TAO_Transport::Drain_Result
1067 TAO_Transport::drain_queue_i (TAO::Transport::Drain_Constraints const & dc)
1069 // This is the vector used to send data, it must be declared outside
1070 // the loop because after the loop there may still be data to be
1071 // sent
1072 int iovcnt = 0;
1073 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
1074 iovec iov[ACE_IOV_MAX] = { { nullptr , 0 } };
1075 #else
1076 iovec iov[ACE_IOV_MAX];
1077 #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */
1079 // We loop over all the elements in the queue ...
1080 TAO_Queued_Message *i = this->head_;
1082 // Reset the value so that the counting is done for each new send
1083 // call.
1084 this->sent_byte_count_ = 0;
1086 // Avoid calling this expensive function each time through the loop. Instead
1087 // we'll assume that the time is unlikely to change much during the loop.
1088 // If we are forced to send in the loop then we'll recompute the time.
1089 ACE_Time_Value now = ACE_High_Res_Timer::gettimeofday_hr ();
1091 while (i != nullptr)
1093 if (i->is_expired (now))
1095 if (TAO_debug_level > 3)
1097 TAOLIB_DEBUG ((LM_DEBUG,
1098 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ")
1099 ACE_TEXT ("Discarding expired queued message.\n"),
1100 this->id ()));
1102 TAO_Queued_Message *next = i->next ();
1103 i->state_changed (TAO_LF_Event::LFS_TIMEOUT,
1104 this->orb_core_->leader_follower ());
1105 i->remove_from_list (this->head_, this->tail_);
1106 i->destroy ();
1107 i = next;
1108 continue;
1110 // ... each element fills the iovector ...
1111 i->fill_iov (ACE_IOV_MAX, iovcnt, iov);
1113 // ... the vector is full, no choice but to send some data out.
1114 // We need to loop because a single message can span multiple
1115 // IOV_MAX elements ...
1116 if (iovcnt == ACE_IOV_MAX)
1118 Drain_Result const retval =
1119 this->drain_queue_helper (iovcnt, iov, dc);
1121 if (TAO_debug_level > 4)
1123 TAOLIB_DEBUG ((LM_DEBUG,
1124 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ")
1125 ACE_TEXT ("helper retval = %d\n"),
1126 this->id (), static_cast<int> (retval.dre_)));
1129 if (retval != DR_QUEUE_EMPTY)
1131 return retval;
1134 now = ACE_High_Res_Timer::gettimeofday_hr ();
1136 i = this->head_;
1137 continue;
1139 // ... notice that this line is only reached if there is still
1140 // room in the iovector ...
1141 i = i->next ();
1144 if (iovcnt != 0)
1146 Drain_Result const retval = this->drain_queue_helper (iovcnt, iov, dc);
1148 if (TAO_debug_level > 4)
1150 TAOLIB_DEBUG ((LM_DEBUG,
1151 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::drain_queue_i, ")
1152 ACE_TEXT ("helper retval = %d\n"),
1153 this->id (), static_cast<int> (retval.dre_)));
1156 if (retval != DR_QUEUE_EMPTY)
1158 return retval;
1162 if (this->queue_is_empty_i ())
1164 if (this->flush_timer_pending ())
1166 ACE_Event_Handler *eh = this->event_handler_i ();
1167 ACE_Reactor * const reactor = eh->reactor ();
1168 reactor->cancel_timer (this->flush_timer_id_);
1169 this->reset_flush_timer ();
1172 return DR_QUEUE_EMPTY;
1175 return DR_OK;
1178 void
1179 TAO_Transport::cleanup_queue_i ()
1181 if (TAO_debug_level > 4)
1183 TAOLIB_DEBUG ((LM_DEBUG,
1184 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ")
1185 ACE_TEXT ("cleaning up complete queue\n"),
1186 this->id ()));
1189 size_t byte_count = 0;
1190 int msg_count = 0;
1192 // Cleanup all messages
1193 while (!this->queue_is_empty_i ())
1195 TAO_Queued_Message *i = this->head_;
1197 if (TAO_debug_level > 4)
1199 byte_count += i->message_length();
1200 ++msg_count;
1202 // @@ This is a good point to insert a flag to indicate that a
1203 // CloseConnection message was successfully received.
1204 i->state_changed (TAO_LF_Event::LFS_CONNECTION_CLOSED,
1205 this->orb_core_->leader_follower ());
1207 i->remove_from_list (this->head_, this->tail_);
1209 i->destroy ();
1212 if (TAO_debug_level > 4)
1214 TAOLIB_DEBUG ((LM_DEBUG,
1215 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue_i, ")
1216 ACE_TEXT ("discarded %d messages, %u bytes.\n"),
1217 this->id (), msg_count, byte_count));
1221 void
1222 TAO_Transport::cleanup_queue (size_t byte_count)
1224 while (!this->queue_is_empty_i () && byte_count > 0)
1226 TAO_Queued_Message *i = this->head_;
1228 if (TAO_debug_level > 4)
1230 TAOLIB_DEBUG ((LM_DEBUG,
1231 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ")
1232 ACE_TEXT ("byte_count = %d\n"),
1233 this->id (), byte_count));
1236 // Update the state of the first message
1237 i->bytes_transferred (byte_count);
1239 if (TAO_debug_level > 4)
1241 TAOLIB_DEBUG ((LM_DEBUG,
1242 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::cleanup_queue, ")
1243 ACE_TEXT ("after transfer, bc = %d, all_sent = %d, ml = %d\n"),
1244 this->id (), byte_count, i->all_data_sent (),
1245 i->message_length ()));
1248 // ... if all the data was sent the message must be removed from
1249 // the queue...
1250 if (i->all_data_sent ())
1252 i->remove_from_list (this->head_, this->tail_);
1253 i->destroy ();
1255 else if (byte_count == 0)
1257 // If we have sent out a full message block, but we are not
1258 // finished with this message, we need to do something with the
1259 // message block chain held by our output stream. If we don't,
1260 // another thread can attempt to service this transport and end
1261 // up resetting the output stream which will release the
1262 // message that we haven't finished sending.
1263 i->copy_if_necessary (this->out_stream ().begin ());
1268 bool
1269 TAO_Transport::check_buffering_constraints_i (TAO_Stub *stub, bool &must_flush)
1271 // First let's compute the size of the queue:
1272 size_t msg_count = 0;
1273 size_t total_bytes = 0;
1275 for (TAO_Queued_Message *i = this->head_; i != nullptr; i = i->next ())
1277 ++msg_count;
1278 total_bytes += i->message_length ();
1281 bool set_timer = false;
1282 ACE_Time_Value new_deadline;
1284 TAO::Transport_Queueing_Strategy *queue_strategy =
1285 stub->transport_queueing_strategy ();
1287 bool constraints_reached = true;
1289 if (queue_strategy)
1291 constraints_reached =
1292 queue_strategy->buffering_constraints_reached (stub,
1293 msg_count,
1294 total_bytes,
1295 must_flush,
1296 this->current_deadline_,
1297 set_timer,
1298 new_deadline);
1300 else
1302 must_flush = false;
1305 // ... set the new timer, also cancel any previous timers ...
1306 // Check for connected state since this method may be called
1307 // before the connection is established and than there will be no
1308 // reactor available yet.
1309 if (set_timer && this->is_connected_)
1311 ACE_Event_Handler *eh = this->event_handler_i ();
1312 ACE_Reactor * const reactor = eh->reactor ();
1313 this->current_deadline_ = new_deadline;
1314 ACE_Time_Value delay = new_deadline - ACE_OS::gettimeofday ();
1316 if (this->flush_timer_pending ())
1318 reactor->cancel_timer (this->flush_timer_id_);
1321 this->flush_timer_id_ =
1322 reactor->schedule_timer (&this->transport_timer_,
1323 &this->current_deadline_,
1324 delay);
1327 return constraints_reached;
1330 void
1331 TAO_Transport::report_invalid_event_handler (const char *caller)
1333 if (TAO_debug_level > 0)
1335 TAOLIB_DEBUG ((LM_DEBUG,
1336 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::report_invalid_event_handler")
1337 ACE_TEXT ("(%C) no longer associated with handler [tag=%d]\n"),
1338 this->id (), caller, this->tag_));
1342 void
1343 TAO_Transport::send_connection_closed_notifications ()
1346 ACE_MT (ACE_GUARD (ACE_Lock, guard, *this->handler_lock_));
1348 this->send_connection_closed_notifications_i ();
1351 this->tms ()->connection_closed ();
1354 void
1355 TAO_Transport::send_connection_closed_notifications_i ()
1357 this->cleanup_queue_i ();
1361 TAO_Transport::send_message_shared_i (TAO_Stub *stub,
1362 TAO_Message_Semantics message_semantics,
1363 const ACE_Message_Block *message_block,
1364 ACE_Time_Value *max_wait_time)
1366 int ret = 0;
1368 #if TAO_HAS_TRANSPORT_CURRENT == 1
1369 size_t const message_length = message_block->length ();
1370 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
1372 switch (message_semantics.type_)
1374 case TAO_Message_Semantics::TAO_TWOWAY_REQUEST:
1375 ret = this->send_synchronous_message_i (message_block, max_wait_time);
1376 break;
1378 case TAO_Message_Semantics::TAO_REPLY:
1379 ret = this->send_reply_message_i (message_block, max_wait_time);
1380 break;
1382 case TAO_Message_Semantics::TAO_ONEWAY_REQUEST:
1383 ret = this->send_asynchronous_message_i (stub,
1384 message_block,
1385 max_wait_time);
1386 break;
1389 #if TAO_HAS_TRANSPORT_CURRENT == 1
1390 // "Count" the message, only if no error was encountered.
1391 if (ret != -1 && this->stats_ != nullptr)
1392 this->stats_->messages_sent (message_length);
1393 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
1395 return ret;
1399 TAO_Transport::send_asynchronous_message_i (TAO_Stub *stub,
1400 const ACE_Message_Block *message_block,
1401 ACE_Time_Value *max_wait_time)
1403 // Let's figure out if the message should be queued without trying
1404 // to send first:
1405 bool try_sending_first = true;
1407 bool const queue_empty = this->queue_is_empty_i ();
1409 TAO::Transport_Queueing_Strategy *queue_strategy =
1410 stub->transport_queueing_strategy ();
1412 if (!queue_empty)
1414 try_sending_first = false;
1416 else if (queue_strategy)
1418 if (queue_strategy->must_queue (queue_empty))
1420 try_sending_first = false;
1424 bool partially_sent = false;
1425 bool timeout_encountered = false;
1427 TAO::Transport::Drain_Constraints dc(
1428 max_wait_time, this->using_blocking_io_for_asynch_messages());
1430 if (try_sending_first)
1432 ssize_t n = 0;
1433 size_t byte_count = 0;
1434 // ... in this case we must try to send the message first ...
1436 size_t const total_length = message_block->total_length ();
1438 if (TAO_debug_level > 6)
1440 TAOLIB_DEBUG ((LM_DEBUG,
1441 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
1442 ACE_TEXT ("trying to send the message (ml = %d)\n"),
1443 this->id (), total_length));
1446 // @@ I don't think we want to hold the mutex here, however if
1447 // we release it we need to recheck the status of the transport
1448 // after we return... once I understand the final form for this
1449 // code I will re-visit this decision
1450 n = this->send_message_block_chain_i (message_block,
1451 byte_count,
1452 dc);
1454 if (n == -1)
1456 // ... if this is just an EWOULDBLOCK we must schedule the
1457 // message for later, if it is ETIME we still have to send
1458 // the complete message, because cutting off the message at
1459 // this point will destroy the synchronization with the
1460 // server ...
1461 if (errno != EWOULDBLOCK && errno != ETIME)
1463 if (TAO_debug_level > 0)
1465 TAOLIB_ERROR ((LM_ERROR,
1466 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
1467 ACE_TEXT ("fatal error in ")
1468 ACE_TEXT ("send_message_block_chain_i - %m\n"),
1469 this->id ()));
1471 return -1;
1475 // ... let's figure out if the complete message was sent ...
1476 if (total_length == byte_count)
1478 // Done, just return. Notice that there are no allocations
1479 // or copies up to this point (though some fancy calling
1480 // back and forth).
1481 // This is the common case for the critical path, it should
1482 // be fast.
1483 return 0;
1486 if (byte_count > 0)
1488 partially_sent = true;
1491 // If it was partially sent, then push to front of queue and don't flush
1492 if (n == -1 && errno == ETIME)
1494 timeout_encountered = true;
1495 if (byte_count == 0)
1497 // This request has timed out and none of it was sent to the transport
1498 // We can't return -1 here, since that would end up closing the transport
1499 if (TAO_debug_level > 2)
1501 TAOLIB_DEBUG ((LM_DEBUG,
1502 ACE_TEXT ("TAO (%P|%t) - ")
1503 ACE_TEXT ("Transport[%d]::send_asynchronous_message_i, ")
1504 ACE_TEXT ("timeout encountered before any bytes sent\n"),
1505 this->id ()));
1507 throw ::CORBA::TIMEOUT (
1508 CORBA::SystemException::_tao_minor_code (
1509 TAO_TIMEOUT_SEND_MINOR_CODE,
1510 ETIME),
1511 CORBA::COMPLETED_NO);
1515 if (TAO_debug_level > 6)
1517 TAOLIB_DEBUG ((LM_DEBUG,
1518 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
1519 ACE_TEXT ("partial send %d / %d bytes\n"),
1520 this->id (), byte_count, total_length));
1523 // ... part of the data was sent, need to figure out what piece
1524 // of the message block chain must be queued ...
1525 while (message_block != nullptr && message_block->length () == 0)
1527 message_block = message_block->cont ();
1530 // ... at least some portion of the message block chain should
1531 // remain ...
1534 // ... either the message must be queued or we need to queue it
1535 // because it was not completely sent out ...
1537 ACE_Time_Value *wait_time = (partially_sent ? nullptr: max_wait_time);
1538 if (this->queue_message_i (message_block, wait_time, !partially_sent)
1539 == -1)
1541 if (TAO_debug_level > 0)
1543 TAOLIB_DEBUG ((LM_DEBUG,
1544 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
1545 ACE_TEXT ("send_asynchronous_message_i, ")
1546 ACE_TEXT ("cannot queue message for - %m\n"),
1547 this->id ()));
1549 return -1;
1552 if (TAO_debug_level > 6)
1554 TAOLIB_DEBUG ((LM_DEBUG,
1555 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::send_asynchronous_message_i, ")
1556 ACE_TEXT ("message is queued\n"),
1557 this->id ()));
1560 if (timeout_encountered && partially_sent)
1562 //Must close down the transport here since we can't guarantee the
1563 //integrity of the GIOP stream (the next send may try to write to
1564 //the socket before looking at the queue).
1565 if (TAO_debug_level > 0)
1567 TAOLIB_DEBUG ((LM_DEBUG,
1568 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
1569 ACE_TEXT ("send_asynchronous_message_i, ")
1570 ACE_TEXT ("timeout after partial send, closing.\n"),
1571 this->id ()));
1573 return -1;
1575 else if (!timeout_encountered)
1577 // We can't flush if we have already encountered a timeout
1578 // ... if the queue is full we need to activate the output on the
1579 // queue ...
1580 bool must_flush = false;
1581 const bool constraints_reached =
1582 this->check_buffering_constraints_i (stub,
1583 must_flush);
1585 // ... but we also want to activate it if the message was partially
1586 // sent.... Plus, when we use the blocking flushing strategy the
1587 // queue is flushed as a side-effect of 'schedule_output()'
1589 TAO_Flushing_Strategy *flushing_strategy =
1590 this->orb_core ()->flushing_strategy ();
1592 if (constraints_reached || try_sending_first)
1594 int const result = flushing_strategy->schedule_output (this);
1595 if (result == TAO_Flushing_Strategy::MUST_FLUSH)
1597 must_flush = true;
1601 if (must_flush)
1603 if (TAO_debug_level > 0)
1605 TAOLIB_DEBUG ((LM_DEBUG,
1606 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::")
1607 ACE_TEXT ("send_asynchronous_message_i, ")
1608 ACE_TEXT ("flushing transport.\n"),
1609 this->id ()));
1612 size_t const sent_byte = sent_byte_count_;
1613 int ret = 0;
1615 typedef ACE_Reverse_Lock<ACE_Lock> TAO_REVERSE_LOCK;
1616 TAO_REVERSE_LOCK reverse (*this->handler_lock_);
1617 ACE_GUARD_RETURN (TAO_REVERSE_LOCK, ace_mon, reverse, -1);
1618 ret = flushing_strategy->flush_transport (this, max_wait_time);
1621 if (ret == -1)
1623 if (errno == ETIME)
1625 if (sent_byte == sent_byte_count_) // if nothing was actually flushed
1627 // This request has timed out and none of it was sent to the transport
1628 // We can't return -1 here, since that would end up closing the transport
1629 if (TAO_debug_level > 2)
1631 TAOLIB_DEBUG ((LM_DEBUG,
1632 ACE_TEXT ("TAO (%P|%t) - ")
1633 ACE_TEXT ("Transport[%d]::send_asynchronous_message_i, ")
1634 ACE_TEXT ("2 timeout encountered before any bytes sent\n"),
1635 this->id ()));
1637 throw ::CORBA::TIMEOUT (CORBA::SystemException::_tao_minor_code
1638 (TAO_TIMEOUT_SEND_MINOR_CODE, ETIME),
1639 CORBA::COMPLETED_NO);
1642 return -1;
1646 return 0;
1650 TAO_Transport::queue_message_i (const ACE_Message_Block *message_block,
1651 ACE_Time_Value *max_wait_time, bool back)
1653 TAO_Queued_Message *queued_message = nullptr;
1654 ACE_NEW_RETURN (queued_message,
1655 TAO_Asynch_Queued_Message (message_block,
1656 this->orb_core_,
1657 max_wait_time,
1658 nullptr,
1659 true),
1660 -1);
1661 if (back) {
1662 queued_message->push_back (this->head_, this->tail_);
1664 else {
1665 queued_message->push_front (this->head_, this->tail_);
1668 return 0;
1672 * All the methods relevant to the incoming data path of the ORB are
1673 * defined below
1676 TAO_Transport::handle_input (TAO_Resume_Handle &rh,
1677 ACE_Time_Value * max_wait_time)
1679 if (TAO_debug_level > 3)
1681 TAOLIB_DEBUG ((LM_DEBUG,
1682 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input\n"),
1683 this->id ()));
1686 // First try to process messages of the head of the incoming queue.
1687 int const retval = this->process_queue_head (rh);
1689 if (retval <= 0)
1691 if (retval == -1)
1693 if (TAO_debug_level > 2)
1695 TAOLIB_ERROR ((LM_ERROR,
1696 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
1697 ACE_TEXT ("error while parsing the head of the queue\n"),
1698 this->id()));
1700 return -1;
1702 else
1704 // retval == 0
1706 // Processed a message in queue successfully. This
1707 // thread must return to thread-pool now.
1708 return 0;
1712 TAO_Queued_Data *q_data = nullptr;
1714 if (this->incoming_message_stack_.top (q_data) != -1
1715 && q_data->missing_data () != TAO_MISSING_DATA_UNDEFINED)
1717 /* PRE: q_data->missing_data_ > 0 as all QD on stack must be incomplete */
1718 if (this->handle_input_missing_data (rh, max_wait_time, q_data) == -1)
1720 if (TAO_debug_level > 0)
1722 TAOLIB_ERROR ((LM_ERROR,
1723 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
1724 ACE_TEXT ("error consolidating incoming message\n"),
1725 this->id ()));
1727 return -1;
1730 else
1732 if (this->handle_input_parse_data (rh, max_wait_time) == -1)
1734 if (TAO_debug_level > 0)
1736 TAOLIB_ERROR ((LM_ERROR,
1737 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input, ")
1738 ACE_TEXT ("error parsing incoming message\n"),
1739 this->id ()));
1741 return -1;
1745 return 0;
1749 TAO_Transport::consolidate_process_message (TAO_Queued_Data *q_data,
1750 TAO_Resume_Handle &rh)
1752 // paranoid check
1753 if (q_data->missing_data () != 0)
1755 if (TAO_debug_level > 0)
1757 TAOLIB_ERROR ((LM_ERROR,
1758 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
1759 ACE_TEXT ("missing data\n"),
1760 this->id ()));
1762 return -1;
1765 if (q_data->more_fragments () ||
1766 q_data->msg_type () == GIOP::Fragment)
1768 // consolidate message on top of stack, only for fragmented messages
1769 TAO_Queued_Data *new_q_data = nullptr;
1771 switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data))
1773 case -1: // error
1774 return -1;
1776 case 0: // returning consolidated message in q_data
1777 if (!new_q_data)
1779 if (TAO_debug_level > 0)
1781 TAOLIB_ERROR ((LM_ERROR,
1782 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
1783 ACE_TEXT ("error, consolidated message is NULL\n"),
1784 this->id ()));
1786 return -1;
1790 if (this->process_parsed_messages (new_q_data, rh) == -1)
1792 TAO_Queued_Data::release (new_q_data);
1794 if (TAO_debug_level > 0)
1796 TAOLIB_ERROR ((LM_ERROR,
1797 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
1798 ACE_TEXT ("error processing consolidated message\n"),
1799 this->id ()));
1801 return -1;
1804 TAO_Queued_Data::release (new_q_data);
1806 break;
1808 case 1: // fragment has been stored in messaging_oject()
1809 break;
1812 else
1814 if (this->process_parsed_messages (q_data, rh) == -1)
1816 TAO_Queued_Data::release (q_data);
1818 if (TAO_debug_level > 0)
1820 TAOLIB_ERROR ((LM_ERROR,
1821 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_process_message, ")
1822 ACE_TEXT ("error processing message\n"),
1823 this->id ()));
1825 return -1;
1828 TAO_Queued_Data::release (q_data);
1831 return 0;
1835 TAO_Transport::consolidate_enqueue_message (TAO_Queued_Data *q_data)
1837 // consolidate message on top of stack, only for fragmented messages
1839 // paranoid check
1840 if (q_data->missing_data () != 0)
1842 return -1;
1845 if (q_data->more_fragments () ||
1846 q_data->msg_type () == GIOP::Fragment)
1848 TAO_Queued_Data *new_q_data = nullptr;
1850 switch (this->messaging_object()->consolidate_fragmented_message (q_data, new_q_data))
1852 case -1: // error
1853 return -1;
1855 case 0: // returning consolidated message in new_q_data
1856 if (!new_q_data)
1858 if (TAO_debug_level > 0)
1860 TAOLIB_ERROR ((LM_ERROR,
1861 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::consolidate_enqueue_message, ")
1862 ACE_TEXT ("error, consolidated message is NULL\n"),
1863 this->id ()));
1865 return -1;
1868 if (this->incoming_message_queue_.enqueue_tail (new_q_data) != 0)
1870 TAO_Queued_Data::release (new_q_data);
1871 return -1;
1873 break;
1875 case 1: // fragment has been stored in messaging_oject()
1876 break;
1879 else
1881 if (this->incoming_message_queue_.enqueue_tail (q_data) != 0)
1883 TAO_Queued_Data::release (q_data);
1884 return -1;
1888 return 0; // success
1892 TAO_Transport::handle_input_missing_data (TAO_Resume_Handle &rh,
1893 ACE_Time_Value * max_wait_time,
1894 TAO_Queued_Data *q_data)
1896 // paranoid check
1897 if (q_data == nullptr)
1899 return -1;
1902 if (TAO_debug_level > 3)
1904 TAOLIB_DEBUG ((LM_DEBUG,
1905 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ")
1906 ACE_TEXT ("enter (missing data == %d)\n"),
1907 this->id (), q_data->missing_data ()));
1910 size_t const recv_size = q_data->missing_data ();
1912 if (q_data->msg_block ()->space() < recv_size)
1914 // make sure the message_block has enough space
1915 size_t const message_size = recv_size + q_data->msg_block ()->length();
1917 if (ACE_CDR::grow (q_data->msg_block (), message_size) == -1)
1919 return -1;
1923 // Saving the size of the received buffer in case any one needs to
1924 // get the size of the message that is received in the
1925 // context. Obviously the value will be changed for each recv call
1926 // and the user is supposed to invoke the accessor only in the
1927 // invocation context to get meaningful information.
1928 this->recv_buffer_size_ = recv_size;
1930 // Read the message into the existing message block on heap
1931 ssize_t const n = this->recv (q_data->msg_block ()->wr_ptr(),
1932 recv_size,
1933 max_wait_time);
1935 if (n <= 0)
1937 return ACE_Utils::truncate_cast<int> (n);
1940 if (TAO_debug_level > 3)
1942 TAOLIB_DEBUG ((LM_DEBUG,
1943 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_missing_data_message, ")
1944 ACE_TEXT ("read bytes %d\n"),
1945 this->id (), n));
1948 q_data->msg_block ()->wr_ptr(n);
1949 q_data->missing_data (q_data->missing_data () - n);
1951 if (q_data->missing_data () == 0)
1953 // paranoid check
1954 if (this->incoming_message_stack_.pop (q_data) == -1)
1956 return -1;
1959 if (this->consolidate_process_message (q_data, rh) == -1)
1961 return -1;
1965 return 0;
1970 TAO_Transport::handle_input_parse_extra_messages (
1971 ACE_Message_Block &message_block)
1973 // store buffer status of last extraction: -1 parse error, 0
1974 // incomplete message header in buffer, 1 complete messages header
1975 // parsed
1976 int buf_status = 0;
1978 TAO_Queued_Data *q_data = nullptr; // init
1980 // parse buffer until all messages have been extracted, consolidate
1981 // and enqueue complete messages, if the last message being parsed
1982 // has missin data, it is stays on top of incoming_message_stack.
1983 while (message_block.length () > 0 &&
1984 (buf_status = this->messaging_object ()->extract_next_message
1985 (message_block, q_data)) != -1 &&
1986 q_data != nullptr) // paranoid check
1988 if (q_data->missing_data () == 0)
1990 if (this->consolidate_enqueue_message (q_data) == -1)
1992 return -1;
1995 else // incomplete message read, probably the last message in buffer
1997 // can not fail
1998 this->incoming_message_stack_.push (q_data);
2001 q_data = nullptr; // reset
2002 } // while
2004 if (buf_status == -1)
2006 return -1;
2009 return 0;
2013 TAO_Transport::handle_input_parse_data (TAO_Resume_Handle &rh,
2014 ACE_Time_Value * max_wait_time)
2016 if (TAO_debug_level > 3)
2018 TAOLIB_DEBUG ((LM_DEBUG,
2019 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
2020 ACE_TEXT ("enter\n"),
2021 this->id ()));
2024 // The buffer on the stack which will be used to hold the input
2025 // messages, ACE_CDR::MAX_ALIGNMENT compensates the
2026 // memory-alignment. This improves performance with SUN-Java-ORB-1.4
2027 // and higher that sends fragmented requests of size 1024 bytes.
2028 char buf [TAO_MAXBUFSIZE + ACE_CDR::MAX_ALIGNMENT];
2030 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
2031 (void) ACE_OS::memset (buf,
2032 '\0',
2033 sizeof buf);
2034 #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */
2036 // Create a data block
2037 ACE_Data_Block db (sizeof (buf),
2038 ACE_Message_Block::MB_DATA,
2039 buf,
2040 this->orb_core_->input_cdr_buffer_allocator (),
2041 this->orb_core_->locking_strategy (),
2042 ACE_Message_Block::DONT_DELETE,
2043 this->orb_core_->input_cdr_dblock_allocator ());
2045 // Create a message block
2046 ACE_Message_Block message_block (&db,
2047 ACE_Message_Block::DONT_DELETE,
2048 this->orb_core_->input_cdr_msgblock_allocator ());
2050 // Align the message block
2051 ACE_CDR::mb_align (&message_block);
2053 size_t recv_size = 0; // Note: unsigned integer
2055 // Pointer to newly parsed message
2056 TAO_Queued_Data *q_data = nullptr;
2058 // Optimizing access of constants
2059 size_t const header_length = this->messaging_object ()->header_length ();
2061 // Paranoid check
2062 if (header_length > message_block.space ())
2064 return -1;
2067 if (this->orb_core_->orb_params ()->single_read_optimization ())
2069 recv_size = message_block.space ();
2071 else
2073 // Single read optimization has been de-activated. That means
2074 // that we need to read from transport the GIOP header first
2075 // before the payload. This codes first checks the incoming
2076 // stack for partial messages which needs to be
2077 // consolidated. Otherwise we are in new cycle, reading complete
2078 // GIOP header of new incoming message.
2079 if (this->incoming_message_stack_.top (q_data) != -1
2080 && q_data->missing_data () == TAO_MISSING_DATA_UNDEFINED)
2082 // There is a partial message on incoming_message_stack_
2083 // whose length is unknown so far. We need to consolidate
2084 // the GIOP header to get to know the payload size,
2085 recv_size = header_length - q_data->msg_block ()->length ();
2087 else
2089 // Read amount of data forming GIOP header of new incoming
2090 // message.
2091 recv_size = header_length;
2093 // POST: 0 <= recv_size <= header_length
2095 // POST: 0 <= recv_size <= message_block->space ()
2097 // If we have a partial message, copy it into our message block and
2098 // clear out the partial message.
2099 if (this->partial_message_ != nullptr && this->partial_message_->length () > 0)
2101 // (*) Copy back the partial message into current read-buffer,
2102 // verify that the read-strategy of "recv_size" bytes is not
2103 // exceeded. The latter check guarantees that recv_size does not
2104 // roll-over and keeps in range
2105 // 0<=recv_size<=message_block->space()
2106 if (this->partial_message_->length () <= recv_size &&
2107 message_block.copy (this->partial_message_->rd_ptr (),
2108 this->partial_message_->length ()) == 0)
2110 recv_size -= this->partial_message_->length ();
2111 // reset is done later to avoid problem in case of EWOULDBLOCK
2112 // or EAGAIN errno
2114 else
2116 return -1;
2119 // POST: 0 <= recv_size <= buffer_space
2121 if (0 >= recv_size) // paranoid: the check above (*) guarantees recv_size>=0
2123 // This event would cause endless looping, trying frequently to
2124 // read zero bytes from stream. This might happen, if TAOs
2125 // protocol implementation is not correct and tries to read data
2126 // beyond header without "single_read_optimazation" being
2127 // activated.
2128 if (TAO_debug_level > 0)
2130 TAOLIB_ERROR ((LM_ERROR,
2131 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
2132 ACE_TEXT ("Error - endless loop detection, closing connection"),
2133 this->id ()));
2135 if (this->partial_message_ != nullptr && this->partial_message_->length () > 0)
2137 this->partial_message_->reset ();
2139 return -1;
2142 // Saving the size of the received buffer in case any one needs to
2143 // get the size of the message thats received in the
2144 // context. Obviously the value will be changed for each recv call
2145 // and the user is supposed to invoke the accessor only in the
2146 // invocation context to get meaningful information.
2147 this->recv_buffer_size_ = recv_size;
2149 // Read the message into the message block that we have created on
2150 // the stack.
2151 ssize_t const n = this->recv (message_block.wr_ptr (),
2152 recv_size,
2153 max_wait_time);
2155 // If there is an error return to the reactor..
2156 // do not reset partial message in case of n == 0 (EWOULDBLOCK || EAGAIN),
2157 // we will need it during next try
2158 if (n <= 0)
2160 if ((n < 0) &&
2161 (this->partial_message_ != nullptr && this->partial_message_->length () > 0))
2163 this->partial_message_->reset ();
2166 return ACE_Utils::truncate_cast<int> (n);
2169 if (this->partial_message_ != nullptr && this->partial_message_->length () > 0)
2171 this->partial_message_->reset ();
2174 if (TAO_debug_level > 3)
2176 TAOLIB_DEBUG ((LM_DEBUG,
2177 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
2178 ACE_TEXT ("read %d bytes\n"),
2179 this->id (), n));
2182 // Set the write pointer in the stack buffer
2183 message_block.wr_ptr (n);
2186 // STACK PROCESSING OR MESSAGE CONSOLIDATION
2189 // PRE: data in buffer is aligned && message_block.length() > 0
2191 if (this->incoming_message_stack_.top (q_data) != -1
2192 && q_data->missing_data () == TAO_MISSING_DATA_UNDEFINED)
2195 // MESSAGE CONSOLIDATION
2198 // Partial message on incoming_message_stack_ needs to be
2199 // consolidated. The message header could not be parsed so far
2200 // and therefor the message size is unknown yet. Consolidating
2201 // the message destroys the memory alignment of succeeding
2202 // messages sharing the buffer, for that reason consolidation
2203 // and stack based processing are mutial exclusive.
2204 if (this->messaging_object ()->consolidate_node (q_data,
2205 message_block) == -1)
2207 if (TAO_debug_level > 0)
2209 TAOLIB_ERROR ((LM_ERROR,
2210 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
2211 ACE_TEXT ("error consolidating message from input buffer\n"),
2212 this->id () ));
2214 return -1;
2217 // Complete message are to be enqueued and later processed
2218 if (q_data->missing_data () == 0)
2220 if (this->incoming_message_stack_.pop (q_data) == -1)
2222 return -1;
2225 if (this->consolidate_enqueue_message (q_data) == -1)
2227 return -1;
2231 if (message_block.length () > 0
2232 && this->handle_input_parse_extra_messages (message_block) == -1)
2234 return -1;
2237 // In any case try to process the enqueued messages
2238 if (this->process_queue_head (rh) == -1)
2240 return -1;
2243 else
2246 // STACK PROCESSING (critical path)
2249 // Process the first message in buffer on stack
2251 // (PRE: first message resides in aligned memory) Make a node of
2252 // the message-block..
2254 TAO_Queued_Data qd (&message_block,
2255 this->orb_core_->transport_message_buffer_allocator ());
2257 size_t mesg_length = 0;
2259 if (this->messaging_object ()->parse_next_message (qd, mesg_length) == -1
2260 || (qd.missing_data () == 0
2261 && mesg_length > message_block.length ()) )
2263 // extracting message failed
2264 return -1;
2266 // POST: qd.missing_data_ == 0 --> mesg_length <= message_block.length()
2267 // This prevents seeking rd_ptr behind the wr_ptr
2269 if (qd.missing_data () != 0 ||
2270 qd.more_fragments () ||
2271 qd.msg_type () == GIOP::Fragment)
2273 if (qd.missing_data () == 0)
2275 // Dealing with a fragment
2276 TAO_Queued_Data *nqd = TAO_Queued_Data::duplicate (qd);
2278 if (nqd == nullptr)
2280 return -1;
2283 // mark the end of message in new buffer
2284 char* end_mark = nqd->msg_block ()->rd_ptr ()
2285 + mesg_length;
2286 nqd->msg_block ()->wr_ptr (end_mark);
2288 // move the read pointer forward in old buffer
2289 message_block.rd_ptr (mesg_length);
2291 // enqueue the message
2292 if (this->consolidate_enqueue_message (nqd) == -1)
2294 return -1;
2297 if (message_block.length () > 0
2298 && this->handle_input_parse_extra_messages (message_block) == -1)
2300 return -1;
2303 // In any case try to process the enqueued messages
2304 if (this->process_queue_head (rh) == -1)
2306 return -1;
2309 else if (qd.missing_data () != TAO_MISSING_DATA_UNDEFINED)
2311 // Incomplete message, must be the last one in buffer
2313 if (qd.missing_data () != TAO_MISSING_DATA_UNDEFINED &&
2314 qd.missing_data () > message_block.space ())
2316 // Re-Allocate correct size on heap
2317 if (ACE_CDR::grow (qd.msg_block (),
2318 message_block.length ()
2319 + qd.missing_data ()) == -1)
2321 return -1;
2325 TAO_Queued_Data *nqd = TAO_Queued_Data::duplicate (qd);
2327 if (nqd == nullptr)
2329 return -1;
2332 // move read-pointer to end of buffer
2333 message_block.rd_ptr (message_block.length());
2335 this->incoming_message_stack_.push (nqd);
2338 else
2341 // critical path
2344 // We cant process the message on stack right now. First we
2345 // have got to parse extra messages from message_block,
2346 // putting them into queue. When this is done we can return
2347 // to process this message, and notifying other threads to
2348 // process the messages in queue.
2349 char * end_marker = message_block.rd_ptr ()
2350 + mesg_length;
2352 if (message_block.length () > mesg_length)
2354 // There are more message in data stream to be parsed.
2355 // Safe the rd_ptr to restore later.
2356 char *rd_ptr_stack_mesg = message_block.rd_ptr ();
2358 // Skip parsed message, jump to next message in buffer
2359 // PRE: mesg_length <= message_block.length ()
2360 message_block.rd_ptr (mesg_length);
2362 // Extract remaining messages and enqueue them for later
2363 // heap processing
2364 if (this->handle_input_parse_extra_messages (message_block) == -1)
2366 return -1;
2369 // correct the wr_ptr using the end_marker to point to the
2370 // end of the first message else the code after this will
2371 // see the full stream with all the messages
2372 message_block.wr_ptr (end_marker);
2374 // Restore rd_ptr
2375 message_block.rd_ptr (rd_ptr_stack_mesg);
2378 // The following if-else has been copied from
2379 // process_queue_head(). While process_queue_head()
2380 // processes message on heap, here we will process a message
2381 // on stack.
2383 // Now that we have one message on stack to be processed,
2384 // check whether we have one more message in the queue...
2385 if (this->incoming_message_queue_.queue_length () > 0)
2387 if (TAO_debug_level > 0)
2389 TAOLIB_DEBUG ((LM_DEBUG,
2390 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::handle_input_parse_data, ")
2391 ACE_TEXT ("notify reactor\n"),
2392 this->id ()));
2395 int const retval = this->notify_reactor ();
2397 if (retval == 1)
2399 // Let the class know that it doesn't need to resume the
2400 // handle..
2401 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
2403 else if (retval < 0)
2404 return -1;
2406 else
2408 // As there are no further messages in queue just resume
2409 // the handle. Set the flag incase someone had reset the flag..
2410 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE);
2413 // PRE: incoming_message_queue is empty
2414 if (this->process_parsed_messages (&qd, rh) == -1)
2416 return -1;
2418 // move the rd_ptr tp position of end_marker
2419 message_block.rd_ptr (end_marker);
2423 // Now that all cases have been processed, there might be kept some data
2424 // in buffer that needs to be safed for next "handle_input" invocations.
2425 if (message_block.length () > 0)
2427 if (this->partial_message_ == nullptr)
2429 this->allocate_partial_message_block ();
2432 if (this->partial_message_ != nullptr &&
2433 this->partial_message_->copy (message_block.rd_ptr (),
2434 message_block.length ()) == 0)
2436 message_block.rd_ptr (message_block.length ());
2438 else
2440 return -1;
2444 return 0;
2449 TAO_Transport::process_parsed_messages (TAO_Queued_Data *qd,
2450 TAO_Resume_Handle &rh)
2452 if (TAO_debug_level > 7)
2454 TAOLIB_DEBUG ((LM_DEBUG,
2455 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
2456 ACE_TEXT ("entering (missing data == %d)\n"),
2457 this->id(), qd->missing_data ()));
2460 #if TAO_HAS_TRANSPORT_CURRENT == 1
2461 // Update stats, if any
2462 if (this->stats_ != nullptr)
2463 this->stats_->messages_received (qd->msg_block ()->length ());
2464 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
2466 switch (qd->msg_type ())
2468 case GIOP::CloseConnection:
2470 if (TAO_debug_level > 0)
2472 TAOLIB_DEBUG ((LM_DEBUG,
2473 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
2474 ACE_TEXT ("received CloseConnection message - %m\n"),
2475 this->id()));
2478 // Return a "-1" so that the next stage can take care of
2479 // closing connection and the necessary memory management.
2480 return -1;
2482 break;
2483 case GIOP::Request:
2484 case GIOP::LocateRequest:
2486 // Let us resume the handle before we go ahead to process the
2487 // request. This will open up the handle for other threads.
2488 rh.resume_handle ();
2490 if (this->messaging_object ()->process_request_message (this, qd) == -1)
2492 // Return a "-1" so that the next stage can take care of
2493 // closing connection and the necessary memory management.
2494 return -1;
2497 break;
2498 case GIOP::Reply:
2499 case GIOP::LocateReply:
2501 rh.resume_handle ();
2503 TAO_Pluggable_Reply_Params params (this);
2505 if (this->messaging_object ()->process_reply_message (params, qd) == -1)
2507 if (TAO_debug_level > 0)
2509 TAOLIB_ERROR ((LM_ERROR,
2510 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
2511 ACE_TEXT ("error in process_reply_message - %m\n"),
2512 this->id ()));
2515 return -1;
2519 break;
2520 case GIOP::CancelRequest:
2522 // The associated request might be incomplete residing
2523 // fragmented in messaging object. We must make sure the
2524 // resources allocated by fragments are released.
2525 if (this->messaging_object ()->discard_fragmented_message (qd) == -1)
2527 if (TAO_debug_level > 0)
2529 TAOLIB_ERROR ((LM_ERROR,
2530 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
2531 ACE_TEXT ("error processing CancelRequest\n"),
2532 this->id ()));
2536 // We are not able to cancel requests being processed already;
2537 // this is declared as optional feature by CORBA, and TAO does
2538 // not support this currently.
2540 // Just continue processing, CancelRequest does not mean to cut
2541 // off the connection.
2543 break;
2544 case GIOP::MessageError:
2546 if (TAO_debug_level > 0)
2548 TAOLIB_ERROR ((LM_ERROR,
2549 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_parsed_messages, ")
2550 ACE_TEXT ("received MessageError, closing connection\n"),
2551 this->id ()));
2553 return -1;
2555 break;
2556 case GIOP::Fragment:
2558 // Nothing to be done.
2560 break;
2563 // If not, just return back..
2564 return 0;
2568 TAO_Transport::process_queue_head (TAO_Resume_Handle &rh)
2570 if (TAO_debug_level > 3)
2572 TAOLIB_DEBUG ((LM_DEBUG,
2573 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, %d enqueued\n"),
2574 this->id (), this->incoming_message_queue_.queue_length () ));
2577 // See if message in queue ...
2578 if (this->incoming_message_queue_.queue_length () > 0)
2580 // Get the message on the head of the queue..
2581 TAO_Queued_Data *qd =
2582 this->incoming_message_queue_.dequeue_head ();
2584 if (TAO_debug_level > 3)
2586 TAOLIB_DEBUG ((LM_DEBUG,
2587 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ")
2588 ACE_TEXT ("the size of the queue is [%d]\n"),
2589 this->id (),
2590 this->incoming_message_queue_.queue_length()));
2592 // Now that we have pulled out out one message out of the queue,
2593 // check whether we have one more message in the queue...
2594 if (this->incoming_message_queue_.queue_length () > 0)
2596 if (TAO_debug_level > 0)
2598 TAOLIB_DEBUG ((LM_DEBUG,
2599 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::process_queue_head, ")
2600 ACE_TEXT ("notify reactor\n"),
2601 this->id ()));
2604 int const retval = this->notify_reactor ();
2606 if (retval == 1)
2608 // Let the class know that it doesn't need to resume the
2609 // handle..
2610 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_LEAVE_SUSPENDED);
2612 else if (retval < 0)
2613 return -1;
2615 else
2617 // As we are ready to process the last message just resume
2618 // the handle. Set the flag incase someone had reset the flag..
2619 rh.set_flag (TAO_Resume_Handle::TAO_HANDLE_RESUMABLE);
2622 // Process the message...
2623 int const retval = this->process_parsed_messages (qd, rh);
2625 // Delete the Queued_Data..
2626 TAO_Queued_Data::release (qd);
2628 return retval;
2631 return 1;
2635 TAO_Transport::notify_reactor_now ()
2637 ACE_Event_Handler *eh = this->event_handler_i ();
2639 // Get the reactor associated with the event handler
2640 ACE_Reactor *reactor = this->orb_core ()->reactor ();
2642 if (TAO_debug_level > 0)
2644 TAOLIB_DEBUG ((LM_DEBUG,
2645 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ")
2646 ACE_TEXT ("notify to Reactor\n"),
2647 this->id ()));
2650 // Send a notification to the reactor...
2651 int const retval = reactor->notify (eh, ACE_Event_Handler::READ_MASK);
2653 if (retval < 0 && TAO_debug_level > 2)
2655 // @todo: need to think about what is the action that
2656 // we can take when we get here.
2657 TAOLIB_ERROR ((LM_ERROR,
2658 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::notify_reactor, ")
2659 ACE_TEXT ("notify to the reactor failed..\n"),
2660 this->id ()));
2663 return 1;
2666 TAO::Transport_Cache_Manager &
2667 TAO_Transport::transport_cache_manager ()
2669 return this->orb_core_->lane_resources ().transport_cache ();
2672 void
2673 TAO_Transport::assign_translators (TAO_InputCDR *inp, TAO_OutputCDR *outp)
2675 if (this->char_translator_)
2677 this->char_translator_->assign (inp);
2678 this->char_translator_->assign (outp);
2680 if (this->wchar_translator_)
2682 this->wchar_translator_->assign (inp);
2683 this->wchar_translator_->assign (outp);
2687 void
2688 TAO_Transport::clear_translators (TAO_InputCDR *inp, TAO_OutputCDR *outp)
2690 if (inp)
2692 inp->char_translator (nullptr);
2693 inp->wchar_translator (nullptr);
2695 if (outp)
2697 outp->char_translator (nullptr);
2698 outp->wchar_translator (nullptr);
2702 ACE_Event_Handler::Reference_Count
2703 TAO_Transport::add_reference ()
2705 return this->event_handler_i ()->add_reference ();
2708 ACE_Event_Handler::Reference_Count
2709 TAO_Transport::remove_reference ()
2711 return this->event_handler_i ()->remove_reference ();
2714 TAO_OutputCDR &
2715 TAO_Transport::out_stream ()
2717 return this->messaging_object ()->out_stream ();
2720 TAO_SYNCH_MUTEX &
2721 TAO_Transport::output_cdr_lock ()
2723 return this->output_cdr_mutex_;
2726 void
2727 TAO_Transport::messaging_init (TAO_GIOP_Message_Version const &version)
2729 this->messaging_object ()->init (version.major, version.minor);
2732 void
2733 TAO_Transport::pre_close ()
2735 if (TAO_debug_level > 9)
2737 TAOLIB_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::pre_close\n"),
2738 this->id_));
2740 // @TODO: something needs to be done with is_connected_. Checking it is
2741 // guarded by a mutex, but setting it is not. Until the need for mutexed
2742 // protection is required, the transport cache is holding its own copy
2743 // of the is_connected_ flag, so that during cache lookups the cache
2744 // manager doesn't need to be burdened by the lock in is_connected().
2745 this->is_connected_ = false;
2746 this->transport_cache_manager ().mark_connected (this->cache_map_entry_,
2747 false);
2748 this->purge_entry ();
2750 ACE_MT (ACE_GUARD (ACE_Lock, guard, *this->handler_lock_));
2751 this->cleanup_queue_i ();
2755 bool
2756 TAO_Transport::post_open (size_t id)
2758 if (TAO_debug_level > 9)
2760 TAOLIB_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport::post_open, ")
2761 ACE_TEXT ("transport id changed from [%d] to [%d]\n"), this->id_, id));
2763 this->id_ = id;
2765 // When we have data in our outgoing queue schedule ourselves
2766 // for output
2767 if (!this->queue_is_empty_i ())
2769 // If the wait strategy wants us to be registered with the reactor
2770 // then we do so. If registration is required and it succeeds,
2771 // #REFCOUNT# becomes two.
2772 if (this->wait_strategy ()->register_handler () == 0)
2774 if (this->flush_in_post_open_)
2776 TAO_Flushing_Strategy *flushing_strategy =
2777 this->orb_core ()->flushing_strategy ();
2779 if (flushing_strategy == nullptr)
2780 throw CORBA::INTERNAL ();
2782 this->flush_in_post_open_ = false;
2783 (void)flushing_strategy->schedule_output (this);
2786 else
2788 // Registration failures.
2790 // Purge from the connection cache, if we are not in the cache, this
2791 // just does nothing.
2792 (void) this->purge_entry ();
2794 // Close the handler.
2795 (void) this->close_connection ();
2797 if (TAO_debug_level > 0)
2799 TAOLIB_ERROR ((LM_ERROR,
2800 ACE_TEXT ("TAO (%P|%t) - Transport[%d]::post_open , ")
2801 ACE_TEXT ("could not register the transport ")
2802 ACE_TEXT ("in the reactor.\n"),
2803 this->id ()));
2806 return false;
2811 ACE_GUARD_RETURN (ACE_Lock, ace_mon, *this->handler_lock_, false);
2812 this->is_connected_ = true;
2815 if (TAO_debug_level > 9)
2817 TAOLIB_DEBUG ((LM_DEBUG, ACE_TEXT ("TAO (%P|%t) - Transport[%d]::post_open")
2818 ACE_TEXT (", cache_map_entry_ is [%@]\n"), this->id_, this->cache_map_entry_));
2821 this->transport_cache_manager ().mark_connected (this->cache_map_entry_,
2822 true);
2824 // update transport cache to make this entry available
2825 this->transport_cache_manager ().set_entry_state (
2826 this->cache_map_entry_,
2827 TAO::ENTRY_IDLE_AND_PURGABLE);
2829 return true;
2832 void
2833 TAO_Transport::allocate_partial_message_block ()
2835 if (this->partial_message_ == nullptr)
2837 // This value must be at least large enough to hold a GIOP message
2838 // header plus a GIOP fragment header
2839 size_t const partial_message_size =
2840 this->messaging_object ()->header_length ();
2841 // + this->messaging_object ()->fragment_header_length ();
2842 // deprecated, conflicts with not-single_read_opt.
2844 ACE_NEW (this->partial_message_,
2845 ACE_Message_Block (partial_message_size));
2849 void
2850 TAO_Transport::set_bidir_context_info (TAO_Operation_Details &)
2854 ACE_Time_Value const *
2855 TAO_Transport::io_timeout(
2856 TAO::Transport::Drain_Constraints const & dc) const
2858 if (dc.block_on_io())
2860 return dc.timeout();
2862 if (this->wait_strategy()->can_process_upcalls())
2864 return nullptr;
2866 return dc.timeout();
2869 bool
2870 TAO_Transport::using_blocking_io_for_synch_messages () const
2872 if (this->wait_strategy()->can_process_upcalls())
2874 return false;
2876 return true;
2879 bool
2880 TAO_Transport::using_blocking_io_for_asynch_messages () const
2882 return false;
2885 bool
2886 TAO_Transport::connection_closed_on_read () const
2888 return connection_closed_on_read_;
2891 TAO_END_VERSIONED_NAMESPACE_DECL