Revert "Minor modernization of DynamicAny code"
[ACE_TAO.git] / TAO / tao / Transport.h
blob4268a2767464f247e52dd84a5dc0d35317003b92
1 // -*- C++ -*-
3 //=============================================================================
4 /**
5 * @file Transport.h
7 * Define the interface for the Transport component in TAO's
8 * pluggable protocol framework.
10 * @author Fred Kuhns <fredk@cs.wustl.edu>
12 //=============================================================================
14 #ifndef TAO_TRANSPORT_H
15 #define TAO_TRANSPORT_H
17 #include /**/ "ace/pre.h"
19 #include "tao/Transport_Cache_Manager.h"
21 #if !defined (ACE_LACKS_PRAGMA_ONCE)
22 # pragma once
23 #endif /* ACE_LACKS_PRAGMA_ONCE */
25 #include "tao/Transport_Timer.h"
26 #include "tao/Incoming_Message_Queue.h"
27 #include "tao/Incoming_Message_Stack.h"
28 #include "tao/Message_Semantics.h"
29 #include "ace/Time_Value.h"
30 #include "ace/Basic_Stats.h"
32 struct iovec;
34 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
36 class TAO_ORB_Core;
37 class TAO_Target_Specification;
38 class TAO_Operation_Details;
39 class TAO_Transport_Mux_Strategy;
40 class TAO_Wait_Strategy;
41 class TAO_Connection_Handler;
42 class TAO_GIOP_Message_Base;
43 class TAO_Codeset_Translator_Base;
45 class TAO_Queued_Message;
46 class TAO_Synch_Queued_Message;
47 class TAO_Resume_Handle;
48 class TAO_Stub;
49 class TAO_MMAP_Allocator;
50 class TAO_ServerRequest;
52 namespace TAO
54 /**
55 * @note Should this be in TAO namespace. Seems like a candidate
56 * that should be in the transport
58 enum Connection_Role
60 TAO_UNSPECIFIED_ROLE = 0,
61 TAO_SERVER_ROLE = 1,
62 TAO_CLIENT_ROLE = 2
65 namespace Transport
67 /// Transport-level statistics. Initially introduced to support
68 /// the "Transport Current" functionality.
69 class Stats;
71 /**
72 * @struct Drain_Constraints
74 * @brief Encapsulate the flushing control parameters.
76 * At several points, the ORB needs to flush data from a transport to the
77 * underlying I/O mechanisms. How this data is flushed depends on the
78 * context where the request is made, the ORB configuration and the
79 * application level policies in effect.
81 * Some examples:
83 * # When idle, the ORB will want to send data on any socket that has
84 * space available. In this case, the queue must be drained on
85 * a best-effort basis, without any blocking.
86 * # If the ORB is configured to handle nested upcalls, any two-way
87 * request should block and push data to the underlying socket as fast
88 * as possible.
89 * # In the same use-case, but now with a timeout policy in
90 * effect, the ORB will need to send the data use I/O operations with
91 * timeouts (as implemented by ACE::sendv()
92 * # When the ORB is configured to support nested upcalls, any two-way,
93 * reliable oneway or similar should wait using the reactor or
94 * Leader-Follower implementation. While still respecting the timeout
95 * policies.
97 * Instead of sprinkling if() statements throughput the critical path
98 * trying to determine how the I/O operations should be performed, we
99 * pass the information encapsulated in this class. The caller into the
100 * Transport object determines the right parameters to use, and the
101 * Transport object simply obeys those instructions.
103 class Drain_Constraints
105 public:
106 /// Default constructor
107 Drain_Constraints()
108 : timeout_(0)
109 , block_on_io_(false)
113 /// Constructor
114 Drain_Constraints(
115 ACE_Time_Value * timeout,
116 bool block_on_io)
117 : timeout_(timeout)
118 , block_on_io_(block_on_io)
123 * If true, then the ORB should block on I/O operations instead of
124 * using non-blocking I/O.
126 bool block_on_io() const
128 return block_on_io_;
132 * The maximum time to block on I/O operations (or nested loops) based
133 * on the current timeout policies.
135 ACE_Time_Value * timeout() const
137 return timeout_;
140 private:
141 Drain_Constraints (const Drain_Constraints &) = delete;
142 Drain_Constraints &operator= (const Drain_Constraints &) = delete;
144 ACE_Time_Value * timeout_;
145 bool block_on_io_;
151 * @class TAO_Transport
153 * @brief Generic definitions for the Transport class.
155 * The transport object is created in the Service handler
156 * constructor and deleted in the Service Handler's destructor!!
158 * The main responsibility of a Transport object is to encapsulate a
159 * connection, and provide a transport independent way to send and
160 * receive data. Since TAO is heavily based on the Reactor for all if
161 * not all its I/O the Transport class is usually implemented with a
162 * helper Connection Handler that adapts the generic Transport
163 * interface to the Reactor types.
165 * <H3>The outgoing data path:</H3>
167 * One of the responsibilities of the TAO_Transport class is to send
168 * out GIOP messages as efficiently as possible. In most cases
169 * messages are put out in FIFO order, the transport object will put
170 * out the message using a single system call and return control to
171 * the application. However, for oneways and AMI requests it may be
172 * more efficient (or required if the SYNC_NONE policy is in effect)
173 * to queue the messages until a large enough data set is available.
174 * Another reason to queue is that some applications cannot block for
175 * I/O, yet they want to send messages so large that a single write()
176 * operation would not be able to cope with them. In such cases we
177 * need to queue the data and use the Reactor to drain the queue.
179 * Therefore, the Transport class may need to use a queue to
180 * temporarily hold the messages, and, in some configurations, it may
181 * need to use the Reactor to concurrently drain such queues.
183 * <H4>Out of order messages:</H4> TAO provides explicit policies to
184 * send 'urgent' messages. Such messages may put at the head of the
185 * queue. However, they cannot be sent immediately because the
186 * transport may already be sending another message in a reactive
187 * fashion.
189 * Consequently, the Transport must also know if the head of the queue
190 * has been partially sent. In that case new messages can only follow
191 * the head. Only once the head is completely sent we can start
192 * sending new messages.
194 * <H4>Waiting threads:</H4> One or more threads can be blocked
195 * waiting for the connection to completely send the message.
196 * The thread should return as soon as its message has been sent, so a
197 * per-thread condition is required. This suggest that simply using a
198 * ACE_Message_Queue would not be enough: there is a significant
199 * amount of ancillary information, to keep on each message that the
200 * Message_Block class does not provide room for.
202 * Blocking I/O is still attractive for some applications. First, my
203 * eliminating the Reactor overhead performance is improved when
204 * sending large blocks of data. Second, using the Reactor to send
205 * out data opens the door for nested upcalls, yet some applications
206 * cannot deal with the reentrancy issues in this case.
208 * <H4>Timeouts:</H4> Some or all messages could have a timeout period
209 * attached to them. The timeout source could either be some
210 * high-level policy or maybe some strategy to prevent denial of
211 * service attacks. In any case the timeouts are per-message, and
212 * later messages could have shorter timeouts.
213 * In fact, some kind of scheduling (such as EDF) could be required in
214 * a few applications.
216 * <H4>Conclusions:</H4> The outgoing data path consist in several
217 * components:
219 * - A queue of pending messages
220 * - A message currently being transmitted
221 * - A per-transport 'send strategy' to choose between blocking on
222 * write, blocking on the reactor or blocking on leader/follower.
223 * - A per-message 'waiting object'
224 * - A per-message timeout
226 * The Transport object provides a single method to send request
227 * messages (send_request_message ()).
229 * <H3>The incoming data path:</H3>
231 * One of the main responsibilities of the transport is to read and
232 * process the incoming GIOP message as quickly and efficiently as
233 * possible. There are other forces that needs to be given due
234 * consideration. They are
235 * - Multiple threads should be able to traverse along the same data
236 * path but should not be able to read from the same handle at the
237 * same time ie. the handle should not be shared between threads at
238 * any instant.
239 * - Reads on the handle could give one or more messages.
240 * - Minimize locking and copying overhead when trying to attack the
241 * above.
243 * <H3>Parsing messages (GIOP) & processing the message:</H3>
245 * The messages should be checked for validity and the right
246 * information should be sent to the higher layer for processing. The
247 * process of doing a sanity check and preparing the messages for the
248 * higher layers of the ORB are done by the messaging protocol.
250 * <H3>Design forces and Challenges </H3>
252 * To keep things as efficient as possible for medium sized requests,
253 * it would be good to minimize data copying and locking along the
254 * incoming path ie. from the time of reading the data from the handle
255 * to the application. We achieve this by creating a buffer on stack
256 * and reading the data from the handle into the buffer. We then pass
257 * the same data block (the buffer is encapsulated into a data block)
258 * to the higher layers of the ORB. The problems stem from the
259 * following
260 * (a) Data is bigger than the buffer that we have on stack
261 * (b) Transports like TCP do not guarantee availability of the whole
262 * chunk of data in one shot. Data could trickle in byte by byte.
263 * (c) Single read gives multiple messages
265 * We solve the problems as follows
267 * (a) First do a read with the buffer on stack. Query the underlying
268 * messaging object whether the message has any incomplete
269 * portion. If so, data will be copied into new buffer being able
270 * to hold full message and is queued; succeeding events will read
271 * data from socket and write directly into this buffer.
272 * Otherwise, if if the message in local buffer is complete, we free
273 * the handle and then send the message to the higher layers of the
274 * ORB for processing.
276 * (b) If buffer with incomplete message has been enqueued, while trying
277 * to do the above, the reactor will call us back when the handle
278 * becomes read ready. The read-operation will copy data directly
279 * into the enqueued buffer. If the message has bee read completely
280 * the message is sent to the higher layers of the ORB for processing.
282 * (c) If we get multiple messages (possible if the client connected
283 * to the server sends oneways or AMI requests), we parse and
284 * split the messages. Every message is put in the queue. Once
285 * the messages are queued, the thread picks up one message to
286 * send to the higher layers of the ORB. Before doing that, if
287 * it finds more messages, it sends a notify to the reactor
288 * without resuming the handle. The next thread picks up a
289 * message from the queue and processes that. Once the queue
290 * is drained the last thread resumes the handle.
292 * <H3>Sending Replies </H3>
294 * We could use the outgoing path of the ORB to send replies. This
295 * would allow us to reuse most of the code in the outgoing data
296 * path. We were doing this till TAO-1.2.3. We run in to
297 * problems. When writing the reply the ORB gets flow controlled, and the
298 * ORB tries to flush the message by going into the reactor. This
299 * resulted in unnecessary nesting. The thread that gets into the
300 * Reactor could potentially handle other messages (incoming or
301 * outgoing) and the stack starts growing leading to crashes.
303 * <H4>Solution to the nesting problem </H4>
305 * The solution that we (plan to) adopt is pretty straight
306 * forward. The thread sending replies will not block to send the
307 * replies but queue the replies and return to the Reactor. (Note the
308 * careful usages of the terms "blocking in the Reactor" as opposed to
309 * "return back to the Reactor".
311 * <B>See Also:</B>
313 * https://htmlpreview.github.io/?https://github.com/DOCGroup/ACE_TAO/blob/master/TAO/docs/pluggable_protocols/index.html
315 class TAO_Export TAO_Transport
317 public:
318 /// Default creator, requires the tag value be supplied.
319 TAO_Transport (CORBA::ULong tag,
320 TAO_ORB_Core *orb_core,
321 size_t input_cdr_size = ACE_CDR::DEFAULT_BUFSIZE);
323 /// Destructor
324 virtual ~TAO_Transport ();
326 /// Return the protocol tag.
328 * The OMG assigns unique tags (a 32-bit unsigned number) to each
329 * protocol. New protocol tags can be obtained free of charge from
330 * the OMG, check the documents in corbafwd.h for more details.
332 CORBA::ULong tag () const;
334 /// Access the ORB that owns this connection.
335 TAO_ORB_Core *orb_core () const;
337 /// Get the TAO_Tranport_Mux_Strategy used by this object.
339 * The role of the TAO_Transport_Mux_Strategy is described in more
340 * detail in that class' documentation. Enough is to say that the
341 * class is used to control how many threads can have pending
342 * requests over the same connection. Multiplexing multiple threads
343 * over the same connection conserves resources and is almost
344 * required for AMI, but having only one pending request per
345 * connection is more efficient and reduces the possibilities of
346 * priority inversions.
348 TAO_Transport_Mux_Strategy *tms () const;
350 /// Return the TAO_Wait_Strategy used by this object.
352 * The role of the TAO_Wait_Strategy is described in more detail in
353 * that class' documentation. Enough is to say that the ORB can wait
354 * for a reply blocking on read(), using the Reactor to wait for
355 * multiple events concurrently or using the Leader/Followers
356 * protocol.
358 TAO_Wait_Strategy *wait_strategy () const;
360 enum Drain_Result_Enum
362 DR_ERROR = -1,
363 DR_OK = 0,
364 DR_QUEUE_EMPTY = 1, // used internally, not returned from drain_queue()
365 DR_WOULDBLOCK = 2
368 /// The handle_output and drain_queue* functions return objects of this
369 /// struct instead of the enum value directly so the compiler will catch
370 /// any uses that assign the return value to an int.
371 struct Drain_Result
373 Drain_Result (Drain_Result_Enum dre) : dre_(dre) {}
374 Drain_Result_Enum dre_;
376 bool operator== (Drain_Result rhs) const
378 return this->dre_ == rhs.dre_;
381 bool operator!= (Drain_Result rhs) const
383 return this->dre_ != rhs.dre_;
387 /// Callback method to reactively drain the outgoing data queue
388 Drain_Result handle_output (TAO::Transport::Drain_Constraints const & c);
390 /// Get the bidirectional flag
391 int bidirectional_flag () const;
393 /// Set the bidirectional flag
394 void bidirectional_flag (int flag);
396 /// Set the Cache Map entry
397 void cache_map_entry (TAO::Transport_Cache_Manager::HASH_MAP_ENTRY *entry);
399 /// Get the Cache Map entry
400 TAO::Transport_Cache_Manager::HASH_MAP_ENTRY *cache_map_entry ();
402 /// Set and Get the identifier for this transport instance.
404 * If not set, this will return an integer representation of
405 * the <code>this</code> pointer for the instance on which
406 * it's called.
408 size_t id () const;
409 void id (size_t id);
412 * Methods dealing with the role of the connection, e.g., CLIENT or SERVER.
413 * See CORBA 2.6 Specification, Section 15.5.1 for origin of definitions.
415 TAO::Connection_Role opened_as () const;
416 void opened_as (TAO::Connection_Role);
418 /// Get and Set the purging order. The purging strategy uses the set
419 /// version to set the purging order.
420 unsigned long purging_order () const;
421 void purging_order(unsigned long value);
423 /// Check if there are messages pending in the queue
425 * @return true if the queue is empty
427 bool queue_is_empty ();
429 /// Register with the reactor via the wait strategy
430 bool register_if_necessary ();
433 /// Added event handler to the handlers set.
435 * Called by the cache when the cache is closing.
437 * @param handlers The TAO_Connection_Handler_Set into which the
438 * transport should place its handler
440 void provide_handler (TAO::Connection_Handler_Set &handlers);
442 /// Add event handlers corresponding to transports that have RW wait
443 /// strategy to the handlers set.
445 * Called by the cache when the ORB is shutting down.
447 * @param handlers The TAO_Connection_Handler_Set into which the
448 * transport should place its handler if the transport has RW
449 * strategy on.
451 * @return true indicates a handler was added to the handler set.
452 * false indocates that the transport did not have a
453 * blockable handler that could be added.
455 bool provide_blockable_handler (TAO::Connection_Handler_Set &handlers);
457 /// Register the handler with the reactor.
459 * Register the handler with the reactor. This method is used by the
460 * Wait_On_Reactor strategy. The transport must register its event
461 * handler with the ORB's Reactor.
463 * @todo I think this method is pretty much useless, the
464 * connections are *always* registered with the Reactor, except in
465 * thread-per-connection mode. In that case putting the connection
466 * in the Reactor would produce unpredictable results anyway.
468 virtual int register_handler ();
470 /// Remove the handler from the reactor.
471 virtual int remove_handler ();
473 /// Write the complete Message_Block chain to the connection.
475 * This method serializes on handler_lock_, guaranteeing that only
476 * thread can execute it on the same instance concurrently.
478 * Often the implementation simply forwards the arguments to the
479 * underlying ACE_Svc_Handler class. Using the code factored out
480 * into ACE.
482 * Be careful with protocols that perform non-trivial
483 * transformations of the data, such as SSLIOP or protocols that
484 * compress the stream.
486 * @param iov contains the data that must be sent.
488 * @param timeout is the maximum time that the application is
489 * willing to wait for the data to be sent, useful in platforms that
490 * implement timed writes.
491 * The timeout value is obtained from the policies set by the
492 * application.
494 * @param bytes_transferred should return the total number of bytes
495 * successfully transferred before the connection blocked. This is
496 * required because in some platforms and/or protocols multiple
497 * system calls may be required to send the chain of message
498 * blocks. The first few calls can work successfully, but the final
499 * one can fail or signal a flow control situation (via EAGAIN).
500 * In this case the ORB expects the function to return -1, errno to
501 * be appropriately set and this argument to return the number of
502 * bytes already on the OS I/O subsystem.
504 * This call can also fail if the transport instance is no longer
505 * associated with a connection (e.g., the connection handler closed
506 * down). In that case, it returns -1 and sets errno to
507 * <code>ENOENT</code>.
509 virtual ssize_t send (iovec *iov,
510 int iovcnt,
511 size_t &bytes_transferred,
512 ACE_Time_Value const * timeout) = 0;
514 #if TAO_HAS_SENDFILE == 1
515 /// Send data through zero-copy write mechanism, if available.
517 * This method sends the data in the I/O vector through the platform
518 * sendfile() function to perform a zero-copy write, if available.
519 * Otherwise, the default fallback implementation simply delegates
520 * to the TAO_Transport::send() method.
522 * @note This method is best used when sending very large blocks of
523 * data.
525 virtual ssize_t sendfile (TAO_MMAP_Allocator * allocator,
526 iovec * iov,
527 int iovcnt,
528 size_t &bytes_transferred,
529 TAO::Transport::Drain_Constraints const & dc);
530 #endif /* TAO_HAS_SENDFILE==1 */
532 /// Read len bytes from into buf.
534 * This method serializes on handler_lock_, guaranteeing that only
535 * thread can execute it on the same instance concurrently.
537 * @param buffer ORB allocated buffer where the data should be
538 * @param timeout The ACE_Time_Value *s is just a place holder for now. It is
539 * not clear this this is the best place to specify this. The actual
540 * timeout values will be kept in the Policies.
542 virtual ssize_t recv (char *buffer,
543 size_t len,
544 const ACE_Time_Value *timeout = 0) = 0;
547 * @name Control connection lifecycle
549 * These methods are routed through the TMS object. The TMS
550 * strategies implement them correctly.
552 //@{
553 /// Request has been just sent, but the reply is not received. Idle
554 /// the transport now.
555 bool idle_after_send ();
557 /// Request is sent and the reply is received. Idle the transport
558 /// now.
559 bool idle_after_reply ();
561 /// Call the implementation method after obtaining the lock.
562 virtual void close_connection ();
563 //@}
565 /** @name Template methods
567 * The Transport class uses the Template Method Pattern to implement
568 * the protocol specific functionality.
569 * Implementors of a pluggable protocol should override the
570 * following methods with the semantics documented below.
573 * Initializing the messaging object. This would be used by the
574 * connector side. On the acceptor side the connection handler
575 * would take care of the messaging objects.
577 void messaging_init (TAO_GIOP_Message_Version const &version);
579 /// Extracts the list of listen points from the @a cdr stream. The
580 /// list would have the protocol specific details of the
581 /// ListenPoints
582 virtual int tear_listen_point_list (TAO_InputCDR &cdr);
584 /// Hooks that can be overridden in concrete transports.
586 * These hooks are invoked just after connection establishment (or
587 * after a connection is fetched from cache). The
588 * return value signifies whether the invoker should proceed with
589 * post connection establishment activities. Protocols like SSLIOP
590 * need this to verify whether connections already established have
591 * valid certificates. There are no pre_connect_hooks () since the
592 * transport doesn't exist before a connection establishment. :-)
594 * @note The methods are not made const with a reason.
596 virtual bool post_connect_hook ();
598 /// Memory management routines.
600 * Forwards to event handler.
602 ACE_Event_Handler::Reference_Count add_reference ();
603 ACE_Event_Handler::Reference_Count remove_reference ();
605 /// Return the messaging object that is used to format the data that
606 /// needs to be sent.
607 TAO_GIOP_Message_Base * messaging_object ();
609 /** @name Template methods
611 * The Transport class uses the Template Method Pattern to implement
612 * the protocol specific functionality.
613 * Implementors of a pluggable protocol should override the
614 * following methods with the semantics documented below.
616 //@{
617 /// Return the event handler used to receive notifications from the
618 /// Reactor.
620 * Normally a concrete TAO_Transport object has-a ACE_Event_Handler
621 * member that functions as an adapter between the ACE_Reactor
622 * framework and the TAO pluggable protocol framework.
623 * In all the protocols implemented so far this role is fullfilled
624 * by an instance of ACE_Svc_Handler.
626 * @todo Since we only use a limited functionality of
627 * ACE_Svc_Handler we could probably implement a generic
628 * adapter class (TAO_Transport_Event_Handler or something), this
629 * will reduce footprint and simplify the process of implementing a
630 * pluggable protocol.
632 * @todo This method has to be renamed to event_handler()
634 virtual ACE_Event_Handler * event_handler_i () = 0;
636 /// Is this transport really connected
637 bool is_connected () const;
639 /// Was a connection seen as closed during a read
640 bool connection_closed_on_read () const;
642 /// Perform all the actions when this transport get opened
643 bool post_open (size_t id);
645 /// do what needs to be done when closing the transport
646 void pre_close ();
648 /// Get the connection handler for this transport
649 TAO_Connection_Handler * connection_handler ();
651 /// Accessor for the output CDR stream
652 TAO_OutputCDR &out_stream ();
654 /// Accessor for synchronizing Transport OutputCDR access
655 TAO_SYNCH_MUTEX &output_cdr_lock ();
657 /// Can the transport be purged?
658 bool can_be_purged ();
660 virtual void set_bidir_context_info (TAO_Operation_Details &opdetails);
662 protected:
663 virtual TAO_Connection_Handler * connection_handler_i () = 0;
665 public:
666 /// This is a request for the transport object to write a
667 /// LocateRequest header before it is sent out.
668 int generate_locate_request (TAO_Target_Specification &spec,
669 TAO_Operation_Details &opdetails,
670 TAO_OutputCDR &output);
672 /// This is a request for the transport object to write a request
673 /// header before it sends out the request
674 virtual int generate_request_header (TAO_Operation_Details &opd,
675 TAO_Target_Specification &spec,
676 TAO_OutputCDR &msg);
678 /// Recache ourselves in the cache
679 int recache_transport (TAO_Transport_Descriptor_Interface* desc);
681 /// Callback to read incoming data
683 * The ACE_Event_Handler adapter invokes this method as part of its
684 * handle_input() operation.
686 * @todo the method name is confusing! Calling it handle_input()
687 * would probably make things easier to understand and follow!
689 * Once a complete message is read the Transport class delegates on
690 * the Messaging layer to invoke the right upcall (on the server) or
691 * the TAO_Reply_Dispatcher (on the client side).
693 * @param max_wait_time In some cases the I/O is synchronous, e.g. a
694 * thread-per-connection server or when Wait_On_Read is enabled. In
695 * those cases a maximum read time can be specified.
697 virtual int handle_input (TAO_Resume_Handle &rh,
698 ACE_Time_Value *max_wait_time = 0);
700 /// Prepare the waiting and demuxing strategy to receive a reply for
701 /// a new request.
703 * Preparing the ORB to receive the reply only once the request is
704 * completely sent opens the system to some subtle race conditions:
705 * suppose the ORB is running in a multi-threaded configuration,
706 * thread A makes a request while thread B is using the Reactor to
707 * process all incoming requests.
708 * Thread A could be implemented as follows:
709 * 1) send the request
710 * 2) setup the ORB to receive the reply
711 * 3) wait for the request
713 * but in this case thread B may receive the reply between step (1)
714 * and (2), and drop it as an invalid or unexpected message.
715 * Consequently the correct implementation is:
716 * 1) setup the ORB to receive the reply
717 * 2) send the request
718 * 3) wait for the reply
720 * The following method encapsulates this idiom.
722 * @todo This is generic code, it should be factored out into the
723 * Transport class.
725 // @nolock b/c this calls send_or_buffer
726 virtual int send_request (TAO_Stub *stub,
727 TAO_ORB_Core *orb_core,
728 TAO_OutputCDR &stream,
729 TAO_Message_Semantics message_semantics,
730 ACE_Time_Value *max_time_wait) = 0;
732 /// This method formats the stream and then sends the message on the
733 /// transport.
735 * Once the ORB is prepared to receive a reply (see send_request()
736 * above), and all the arguments have been marshaled the CDR stream
737 * must be 'formatted', i.e. the message_size field in the GIOP
738 * header can finally be set to the proper value.
741 virtual int send_message (TAO_OutputCDR &stream,
742 TAO_Stub *stub = 0,
743 TAO_ServerRequest *request = 0,
744 TAO_Message_Semantics message_semantics = TAO_Message_Semantics (),
745 ACE_Time_Value *max_time_wait = 0) = 0;
747 /// Sent the contents of @a message_block
749 * @param stub The object reference used for this operation, useful
750 * to obtain the current policies.
751 * @param message_semantics If this is set to TAO_TWO_REQUEST
752 * this method will block until the operation is completely
753 * written on the wire. If it is set to other values this
754 * operation could return.
755 * @param message_block The CDR encapsulation of the GIOP message
756 * that must be sent. The message may consist of
757 * multiple Message Blocks chained through the cont()
758 * field.
759 * @param max_wait_time The maximum time that the operation can
760 * block, used in the implementation of timeouts.
762 virtual int send_message_shared (TAO_Stub *stub,
763 TAO_Message_Semantics message_semantics,
764 const ACE_Message_Block *message_block,
765 ACE_Time_Value *max_wait_time);
767 protected:
768 /// Process the message by sending it to the higher layers of the
769 /// ORB.
770 int process_parsed_messages (TAO_Queued_Data *qd,
771 TAO_Resume_Handle &rh);
773 /// Implement send_message_shared() assuming the handler_lock_ is
774 /// held.
775 int send_message_shared_i (TAO_Stub *stub,
776 TAO_Message_Semantics message_semantics,
777 const ACE_Message_Block *message_block,
778 ACE_Time_Value *max_wait_time);
780 /// Queue a message for @a message_block
781 /// @param max_wait_time The maximum time that the operation can
782 /// block, used in the implementation of timeouts.
783 /// @param back If true, the message will be pushed to the back of the queue.
784 /// If false, the message will be pushed to the front of the queue.
785 int queue_message_i (const ACE_Message_Block *message_block,
786 ACE_Time_Value *max_wait_time, bool back=true);
789 * @brief Re-factor computation of I/O timeouts based on operation
790 * timeouts.
791 * Depending on the wait strategy, we need to timeout I/O operations or
792 * not. For example, if we are using a non-blocking strategy, we want
793 * to pass 0 to all I/O operations, and rely on the ACE_NONBLOCK
794 * settings on the underlying sockets. However, for blocking strategies
795 * we want to pass the operation timeouts, to respect the application
796 * level policies.
798 * This function was introduced as part of the fixes for bug 3647.
800 ACE_Time_Value const *io_timeout(
801 TAO::Transport::Drain_Constraints const & dc) const;
803 public:
804 /// Format and queue a message for @a stream
805 /// @param max_wait_time The maximum time that the operation can
806 /// block, used in the implementation of timeouts.
807 int format_queue_message (TAO_OutputCDR &stream,
808 ACE_Time_Value *max_wait_time,
809 TAO_Stub* stub);
812 * This is a very specialized interface to send a simple chain of
813 * messages through the Transport. The only place we use this interface
814 * is in GIOP_Message_Base.cpp, to send error messages (i.e., an
815 * indication that we received a malformed GIOP message,) and to close
816 * the connection.
819 int send_message_block_chain (const ACE_Message_Block *message_block,
820 size_t &bytes_transferred,
821 ACE_Time_Value *max_wait_time = 0);
823 /// Send a message block chain, assuming the lock is held
824 int send_message_block_chain_i (const ACE_Message_Block *message_block,
825 size_t &bytes_transferred,
826 TAO::Transport::Drain_Constraints const & dc);
828 /// Cache management
829 int purge_entry ();
831 /// Cache management
832 int make_idle ();
834 /// Cache management
835 int update_transport ();
837 /// The timeout callback, invoked when any of the timers related to
838 /// this transport expire.
840 * @param current_time The current time as reported from the Reactor
841 * @param act The Asynchronous Completion Token. Currently it is
842 * interpreted as follows:
843 * - If the ACT is the address of this->current_deadline_ the
844 * queueing timeout has expired and the queue should start
845 * flushing.
847 * @return Returns 0 if there are no problems, -1 if there is an
848 * error
850 * @todo In the future this function could be used to expire
851 * messages (oneways) that have been sitting for too long on
852 * the queue.
854 int handle_timeout (const ACE_Time_Value &current_time, const void* act);
856 /// Accessor to recv_buffer_size_
857 size_t recv_buffer_size () const;
859 /// Accessor to sent_byte_count_
860 size_t sent_byte_count () const;
862 /// CodeSet Negotiation - Get the char codeset translator factory
863 TAO_Codeset_Translator_Base *char_translator () const;
865 /// CodeSet Negotiation - Get the wchar codeset translator factory
866 TAO_Codeset_Translator_Base *wchar_translator () const;
868 /// CodeSet negotiation - Set the char codeset translator factory
869 void char_translator (TAO_Codeset_Translator_Base *);
871 /// CodeSet negotiation - Set the wchar codeset translator factory
872 void wchar_translator (TAO_Codeset_Translator_Base *);
874 /// Use the Transport's codeset factories to set the translator for input
875 /// and output CDRs.
876 void assign_translators (TAO_InputCDR *, TAO_OutputCDR *);
878 /// It is necessary to clear the codeset translator when a CDR stream
879 /// is used for more than one GIOP message. This is required since the
880 /// header must not be translated, whereas the body must be.
881 void clear_translators (TAO_InputCDR *, TAO_OutputCDR *);
883 /// Return true if the tcs has been set
884 CORBA::Boolean is_tcs_set() const;
886 /// Set the state of the first_request_ to flag.
887 void first_request_sent (bool flag = false);
889 /// Get the first request flag
890 bool first_request () const;
892 /// Notify all the components inside a Transport when the underlying
893 /// connection is closed.
894 void send_connection_closed_notifications ();
896 /// Transport statistics
897 TAO::Transport::Stats* stats () const;
899 private:
900 /// Helper method that returns the Transport Cache Manager.
901 TAO::Transport_Cache_Manager &transport_cache_manager ();
903 /// Send some of the data in the queue.
905 * As the outgoing data is drained this method is invoked to send as
906 * much of the current message as possible.
908 Drain_Result drain_queue (TAO::Transport::Drain_Constraints const & dc);
910 /// Implement drain_queue() assuming the lock is held
911 Drain_Result drain_queue_i (TAO::Transport::Drain_Constraints const & dc);
913 /// Check if there are messages pending in the queue
915 * This version assumes that the lock is already held. Use with
916 * care!
918 * @return true if the queue is empty
920 bool queue_is_empty_i () const;
922 /// A helper routine used in drain_queue_i()
923 Drain_Result drain_queue_helper (int &iovcnt, iovec iov[],
924 TAO::Transport::Drain_Constraints const & dc);
926 /// These classes need privileged access to:
927 /// - schedule_output_i()
928 /// - cancel_output_i()
929 friend class TAO_Reactive_Flushing_Strategy;
930 friend class TAO_Leader_Follower_Flushing_Strategy;
932 /// Needs priveleged access to
933 /// event_handler_i ()
934 friend class TAO_Thread_Per_Connection_Handler;
936 /// Schedule handle_output() callbacks
937 int schedule_output_i ();
939 /// Cancel handle_output() callbacks
940 int cancel_output_i ();
942 /// Cleanup the queue.
944 * Exactly @a byte_count bytes have been sent, the queue must be
945 * cleaned up as potentially several messages have been completely
946 * sent out.
947 * It leaves on head_ the next message to send out.
949 void cleanup_queue (size_t byte_count);
951 /// Cleanup the complete queue
952 void cleanup_queue_i ();
954 /// Check if the buffering constraints have been reached
955 bool check_buffering_constraints_i (TAO_Stub *stub, bool &must_flush);
957 /// Send a synchronous message, i.e. block until the message is on
958 /// the wire
959 int send_synchronous_message_i (const ACE_Message_Block *message_block,
960 ACE_Time_Value *max_wait_time);
962 /// Send a reply message, i.e. do not block until the message is on
963 /// the wire, but just return after adding them to the queue.
964 int send_reply_message_i (const ACE_Message_Block *message_block,
965 ACE_Time_Value *max_wait_time);
967 /// Send an asynchronous message, i.e. do not block until the message is on
968 /// the wire
969 int send_asynchronous_message_i (TAO_Stub *stub,
970 const ACE_Message_Block *message_block,
971 ACE_Time_Value *max_wait_time);
973 /// A helper method used by send_synchronous_message_i() and
974 /// send_reply_message_i(). Reusable code that could be used by both
975 /// the methods.
976 int send_synch_message_helper_i (TAO_Synch_Queued_Message &s,
977 ACE_Time_Value *max_wait_time);
979 /// Check if the flush timer is still pending
980 int flush_timer_pending () const;
982 /// The flush timer expired or was explicitly cancelled, mark it as
983 /// not pending
984 void reset_flush_timer ();
986 /// Print out error messages if the event handler is not valid
987 void report_invalid_event_handler (const char *caller);
989 /// Is invoked by handle_input operation. It consolidate message on
990 /// top of incoming_message_stack. The amount of missing data is
991 /// known and recv operation copies data directly into message buffer,
992 /// as much as a single recv-invocation provides.
993 int handle_input_missing_data (TAO_Resume_Handle &rh,
994 ACE_Time_Value *max_wait_time,
995 TAO_Queued_Data *q_data);
997 /// Is invoked by handle_input operation. It parses new messages from input stream
998 /// or consolidates messages whose header has been partially read, the message
999 /// size being unknown so far. It parses as much data as a single recv-invocation provides.
1000 int handle_input_parse_data (TAO_Resume_Handle &rh,
1001 ACE_Time_Value *max_wait_time);
1003 /// Is invoked by handle_input_parse_data. Parses all messages remaining
1004 /// in @a message_block.
1005 int handle_input_parse_extra_messages (ACE_Message_Block &message_block);
1007 /// @return -1 error, otherwise 0
1008 int consolidate_enqueue_message (TAO_Queued_Data *qd);
1010 /// @return -1 error, otherwise 0
1011 int consolidate_process_message (TAO_Queued_Data *qd, TAO_Resume_Handle &rh);
1014 * Process the message that is in the head of the incoming queue.
1015 * If there are more messages in the queue, this method calls
1016 * this->notify_reactor () to wake up a thread
1017 * @retval -1 on error
1018 * @retval 0 if successfully processing enqueued messages
1019 * @retval 1 if no message present in queue
1021 int process_queue_head (TAO_Resume_Handle &rh);
1024 * This call prepares a new handler for the notify call and sends a
1025 * notify () call to the reactor.
1027 int notify_reactor ();
1029 protected:
1031 * Same as notify_reactor above but does NOT first check for a
1032 * registered TAO_Wait_Strategy.
1034 int notify_reactor_now ();
1036 private:
1037 TAO_Transport (const TAO_Transport &) = delete;
1038 TAO_Transport &operator= (const TAO_Transport &) = delete;
1040 /// Assume the lock is held
1041 void send_connection_closed_notifications_i ();
1043 /// Allocate a partial message block and store it in our
1044 /// partial_message_ data member.
1045 void allocate_partial_message_block ();
1048 * Return true if blocking I/O should be used for sending synchronous
1049 * (two-way, reliable oneways, etc.) messages. This is determined based
1050 * on the current flushing and waiting strategies.
1052 bool using_blocking_io_for_synch_messages() const;
1055 * Return true if blocking I/O should be used for sending asynchronous
1056 * (AMI calls, non-blocking oneways, responses to operations, etc.)
1057 * messages. This is determined based on the current flushing strategy.
1059 bool using_blocking_io_for_asynch_messages() const;
1062 * Specialization hook to add concrete private methods from
1063 * TAO's protocol implementation onto the base Transport class
1066 protected:
1067 /// IOP protocol tag.
1068 CORBA::ULong const tag_;
1070 /// Global orbcore resource.
1071 TAO_ORB_Core * const orb_core_;
1073 /// Our entry in the cache. We don't own this. It is here for our
1074 /// convenience. We cannot just change things around.
1075 TAO::Transport_Cache_Manager::HASH_MAP_ENTRY *cache_map_entry_;
1077 /// Strategy to decide whether multiple requests can be sent over the
1078 /// same connection or the connection is exclusive for a request.
1079 TAO_Transport_Mux_Strategy *tms_;
1081 /// Strategy for waiting for the reply after sending the request.
1082 TAO_Wait_Strategy *ws_;
1084 /// Use to check if bidirectional info has been synchronized with
1085 /// the peer.
1087 * Have we sent any info on bidirectional information or have we
1088 * received any info regarding making the connection served by this
1089 * transport bidirectional.
1090 * The flag is used as follows:
1091 * + We dont want to send the bidirectional context info more than
1092 * once on the connection. Why? Waste of marshalling and
1093 * demarshalling time on the client.
1094 * + On the server side -- once a client that has established the
1095 * connection asks the server to use the connection both ways, we
1096 * *dont* want the server to pack service info to the client. That
1097 * is not allowed. We need a flag to prevent such a things from
1098 * happening.
1100 * The value of this flag will be 0 if the client sends info and 1
1101 * if the server receives the info.
1103 int bidirectional_flag_;
1105 TAO::Connection_Role opening_connection_role_;
1107 /// Implement the outgoing data queue
1108 TAO_Queued_Message *head_;
1109 TAO_Queued_Message *tail_;
1111 /// Queue of the consolidated, incoming messages..
1112 TAO_Incoming_Message_Queue incoming_message_queue_;
1114 /// Stack of incoming fragments, consolidated messages
1115 /// are going to be enqueued in "incoming_message_queue_"
1116 TAO::Incoming_Message_Stack incoming_message_stack_;
1118 /// The queue will start draining no later than <queeing_deadline_>
1119 /// *if* the deadline is
1120 ACE_Time_Value current_deadline_;
1122 /// The timer ID
1123 long flush_timer_id_;
1125 /// The adapter used to receive timeout callbacks from the Reactor
1126 TAO_Transport_Timer transport_timer_;
1128 /// Lock that insures that activities that *might* use handler-related
1129 /// resources (such as a connection handler) get serialized.
1131 * This is an <code>ACE_Lock</code> that gets initialized from
1132 * @c TAO_ORB_Core::resource_factory()->create_cached_connection_lock().
1133 * This way, one can use a lock appropriate for the type of system, i.e.,
1134 * a null lock for single-threaded systems, and a real lock for
1135 * multi-threaded systems.
1137 mutable ACE_Lock *handler_lock_;
1139 /// A unique identifier for the transport.
1141 * This never *never* changes over the lifespan, so we don't have to worry
1142 * about locking it.
1144 * HINT: Protocol-specific transports that use connection handler
1145 * might choose to set this to the handle for their connection.
1147 size_t id_;
1149 /// Used by the LRU, LFU and FIFO Connection Purging Strategies.
1150 unsigned long purging_order_;
1152 /// Size of the buffer received.
1153 size_t recv_buffer_size_;
1155 /// Number of bytes sent.
1156 size_t sent_byte_count_;
1158 /// Is this transport really connected or not. In case of oneways with
1159 /// SYNC_NONE Policy we don't wait until the connection is ready and we
1160 /// buffer the requests in this transport until the connection is ready
1161 bool is_connected_;
1163 /// Track if connection was seen as closed during a read so that
1164 /// invocation can optionally be retried using a different profile.
1165 /// Note that this could result in violate the "at most once" CORBA
1166 /// semantics.
1167 bool connection_closed_on_read_;
1169 private:
1170 /// Our messaging object.
1171 TAO_GIOP_Message_Base *messaging_object_;
1173 /// @@Phil, I think it would be nice if we could think of a way to
1174 /// do the following.
1175 /// We have been trying to use the transport for marking about
1176 /// translator factories and such! IMHO this is a wrong encapulation
1177 /// ie. trying to populate the transport object with these
1178 /// details. We should probably have a class something like
1179 /// TAO_Message_Property or TAO_Message_Translator or whatever (I am
1180 /// sure you get the idea) and encapsulate all these
1181 /// details. Coupling these seems odd. if I have to be more cynical
1182 /// we can move this to the connection_handler and it may more sense
1183 /// with the DSCP stuff around there. Do you agree?
1185 /// Additional member values required to support codeset translation
1186 TAO_Codeset_Translator_Base *char_translator_;
1187 TAO_Codeset_Translator_Base *wchar_translator_;
1189 /// The tcs_set_ flag indicates that negotiation has occurred and so the
1190 /// translators are correct, since a null translator is valid if both ends
1191 /// are using the same codeset, whatever that codeset might be.
1192 CORBA::Boolean tcs_set_;
1194 /// First_request_ is true until the first request is sent or received. This
1195 /// is necessary since codeset context information is necessary only on the
1196 /// first request. After that, the translators are fixed for the life of the
1197 /// connection.
1198 bool first_request_;
1200 /// Holds the partial GIOP message (if there is one)
1201 ACE_Message_Block* partial_message_;
1203 #if TAO_HAS_SENDFILE == 1
1204 /// mmap()-based allocator used to allocator output CDR buffers.
1206 * If this pointer is non-zero, sendfile() will be used to send data
1207 * in a TAO_OutputCDR stream instance.
1209 TAO_MMAP_Allocator * const mmap_allocator_;
1210 #endif /* TAO_HAS_SENDFILE==1 */
1212 #if TAO_HAS_TRANSPORT_CURRENT == 1
1213 /// Statistics
1214 TAO::Transport::Stats* stats_;
1215 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
1217 /// Indicate that flushing needs to be done in post_open()
1218 bool flush_in_post_open_;
1220 /// lock for synchronizing Transport OutputCDR access
1221 mutable TAO_SYNCH_MUTEX output_cdr_mutex_;
1224 #if TAO_HAS_TRANSPORT_CURRENT == 1
1225 namespace TAO
1227 namespace Transport
1230 * @class Stats
1232 * @brief Used to collect stats on a transport.
1234 * The base class in (potentially) extensible hierarchy used to
1235 * specialize the information available for a specific protocol.
1237 * This class is necessary for the implementation of the Transport
1238 * Current feature.
1240 * <B>See Also:</B>
1242 * https://htmlpreview.github.io/?https://github.com/DOCGroup/ACE_TAO/blob/master/TAO/docs/transport_current/index.html
1245 class TAO_Export Stats
1247 public:
1248 Stats ();
1249 virtual ~Stats ();
1251 void messages_sent (size_t message_length);
1252 CORBA::LongLong messages_sent () const;
1253 CORBA::LongLong bytes_sent () const;
1255 void messages_received (size_t message_length);
1256 CORBA::LongLong messages_received () const;
1257 CORBA::LongLong bytes_received () const;
1259 void opened_since (const ACE_Time_Value& tv);
1260 const ACE_Time_Value& opened_since () const;
1262 private:
1263 /// Mutex guarding the internal state of the statistics
1264 mutable TAO_SYNCH_MUTEX stat_mutex_;
1266 /// The bytes_rcvd_.samples_count() could have been used instead,
1267 /// however there was a suspicion that 32 bits would be
1268 /// insufficient.
1269 CORBA::LongLong messages_rcvd_;
1271 /// The bytes_sent_.samples_count() could have been used instead,
1272 /// however there was a suspicion that 32 bits would be
1273 /// insufficient.
1274 CORBA::LongLong messages_sent_;
1276 ACE_Basic_Stats bytes_rcvd_;
1277 ACE_Basic_Stats bytes_sent_;
1279 ACE_Time_Value opened_since_;
1283 #endif /* TAO_HAS_TRANSPORT_CURRENT == 1 */
1285 TAO_END_VERSIONED_NAMESPACE_DECL
1287 #if defined (__ACE_INLINE__)
1288 # include "tao/Transport.inl"
1289 #endif /* __ACE_INLINE__ */
1291 #include /**/ "ace/post.h"
1293 #endif /* TAO_TRANSPORT_H */