2 #include "tao/GIOP_Message_Base.h"
3 #include "tao/operation_details.h"
5 #include "tao/ORB_Core.h"
6 #include "tao/TAO_Server_Request.h"
7 #include "tao/GIOP_Message_Locate_Header.h"
8 #include "tao/Transport.h"
9 #include "tao/Transport_Mux_Strategy.h"
10 #include "tao/LF_Strategy.h"
11 #include "tao/Request_Dispatcher.h"
12 #include "tao/Codeset_Manager.h"
13 #include "tao/SystemException.h"
14 #include "tao/ZIOP_Adapter.h"
15 #include "ace/Min_Max.h"
17 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
19 TAO_GIOP_Message_Base::TAO_GIOP_Message_Base (TAO_ORB_Core
*orb_core
,
20 TAO_Transport
*transport
,
21 size_t input_cdr_size
)
22 : orb_core_ (orb_core
)
23 , fragmentation_strategy_ (orb_core
->fragmentation_strategy (transport
))
24 , out_stream_ (nullptr,
27 orb_core
->output_cdr_buffer_allocator (),
28 orb_core
->output_cdr_dblock_allocator (),
29 orb_core
->output_cdr_msgblock_allocator (),
30 orb_core
->orb_params ()->cdr_memcpy_tradeoff (),
31 fragmentation_strategy_
,
35 #if defined (TAO_HAS_MONITOR_POINTS) && (TAO_HAS_MONITOR_POINTS == 1)
36 const int nibbles
= 2 * sizeof (size_t);
37 char hex_string
[nibbles
+ 1];
38 ACE_OS::sprintf (hex_string
,
41 hex_string
[nibbles
] = '\0';
42 ACE_CString
monitor_name ("OutputCDR_");
43 monitor_name
+= hex_string
;
44 this->out_stream_
.register_monitor (monitor_name
.c_str ());
45 #endif /* TAO_HAS_MONITOR_POINTS==1 */
49 TAO_GIOP_Message_Base::~TAO_GIOP_Message_Base ()
51 #if defined (TAO_HAS_MONITOR_POINTS) && (TAO_HAS_MONITOR_POINTS == 1)
52 this->out_stream_
.unregister_monitor ();
53 #endif /* TAO_HAS_MONITOR_POINTS==1 */
54 delete fragmentation_strategy_
;
58 TAO_GIOP_Message_Base::init (CORBA::Octet major
, CORBA::Octet minor
)
60 // Set the giop version of the out stream
61 this->out_stream_
.set_version (major
, minor
);
65 TAO_GIOP_Message_Base::out_stream ()
67 return this->out_stream_
;
71 TAO_GIOP_Message_Base::generate_request_header (
72 TAO_Operation_Details
&op
,
73 TAO_Target_Specification
&spec
,
76 // Get a parser for us
77 TAO_GIOP_Message_Version giop_version
;
79 cdr
.get_version (giop_version
);
81 // Write the GIOP header first
82 if (!this->write_protocol_header (GIOP::Request
, giop_version
, cdr
))
86 TAOLIB_ERROR ((LM_ERROR
,
87 ACE_TEXT ("(%P|%t) Error in writing GIOP header\n")));
93 // Get the parser we need to use
94 TAO_GIOP_Message_Generator_Parser
*generator_parser
=
95 this->get_parser (giop_version
);
97 // Now call the implementation for the rest of the header
98 if (!generator_parser
->write_request_header (op
, spec
, cdr
))
101 TAOLIB_ERROR ((LM_ERROR
,
102 ACE_TEXT ("(%P|%t) Error in writing request header\n")));
111 TAO_GIOP_Message_Base::generate_locate_request_header (
112 TAO_Operation_Details
&op
,
113 TAO_Target_Specification
&spec
,
116 TAO_GIOP_Message_Version giop_version
;
118 cdr
.get_version (giop_version
);
120 // Get the parser we need to use
121 TAO_GIOP_Message_Generator_Parser
*generator_parser
=
122 this->get_parser (giop_version
);
124 // Write the GIOP header first
125 if (!this->write_protocol_header (GIOP::LocateRequest
, giop_version
, cdr
))
128 TAOLIB_ERROR ((LM_ERROR
,
129 ACE_TEXT ("(%P|%t) Error in writing GIOP header\n")));
134 // Now call the implementation for the rest of the header
135 if (!generator_parser
->write_locate_request_header
136 (op
.request_id (), spec
, cdr
))
139 TAOLIB_ERROR ((LM_ERROR
,
140 ACE_TEXT ("(%P|%t) Error in writing locate request header\n")));
150 TAO_GIOP_Message_Base::generate_reply_header (
152 TAO_Pluggable_Reply_Params_Base
¶ms
)
154 TAO_GIOP_Message_Version giop_version
;
156 cdr
.get_version (giop_version
);
158 // Write the GIOP header first
159 if (!this->write_protocol_header (GIOP::Reply
, giop_version
, cdr
))
162 TAOLIB_ERROR ((LM_ERROR
,
163 ACE_TEXT ("(%P|%t) Error in writing GIOP header\n")));
170 // Get the parser we need to use
171 TAO_GIOP_Message_Generator_Parser
*generator_parser
=
172 this->get_parser (giop_version
);
174 // Now call the implementation for the rest of the header
175 if (!generator_parser
->write_reply_header (cdr
, params
))
177 if (TAO_debug_level
> 4)
178 TAOLIB_ERROR ((LM_ERROR
,
179 ACE_TEXT ("(%P|%t) Error in writing reply ")
180 ACE_TEXT ("header\n")));
185 catch (const ::CORBA::Exception
& ex
)
187 if (TAO_debug_level
> 4)
188 ex
._tao_print_exception (
189 ACE_TEXT ("TAO_GIOP_Message_Base::generate_reply_header"));
198 TAO_GIOP_Message_Base::generate_fragment_header (TAO_OutputCDR
& cdr
,
199 CORBA::ULong request_id
)
201 TAO_GIOP_Message_Version giop_version
;
203 cdr
.get_version (giop_version
);
205 // GIOP fragments are supported in GIOP 1.1 and better, but TAO only
206 // supports them in 1.2 or better since GIOP 1.1 fragments do not
207 // have a fragment message header.
208 if (giop_version
.major
== 1 && giop_version
.minor
< 2)
211 // Get the parser we need to use
212 TAO_GIOP_Message_Generator_Parser
*generator_parser
=
213 this->get_parser (giop_version
);
215 // Write the GIOP header first
216 if (!this->write_protocol_header (GIOP::Fragment
, giop_version
, cdr
)
217 || !generator_parser
->write_fragment_header (cdr
, request_id
))
220 TAOLIB_ERROR ((LM_ERROR
,
221 ACE_TEXT ("(%P|%t) Error in writing GIOP header\n")));
230 TAO_GIOP_Message_Base::dump_consolidated_msg (TAO_OutputCDR
&stream
)
232 // Check whether the output cdr stream is build up of multiple
233 // messageblocks. If so, consolidate them to one block that can be
235 ACE_Message_Block
* consolidated_block
= nullptr;
236 char *buf
= const_cast <char*> (stream
.buffer ());
237 size_t const total_len
= stream
.total_length ();
238 if (stream
.begin()->cont () != nullptr)
240 ACE_NEW_RETURN (consolidated_block
, ACE_Message_Block
, 0);
241 ACE_CDR::consolidate (consolidated_block
, stream
.begin ());
242 buf
= (char *) (consolidated_block
->rd_ptr ());
245 this->dump_msg ("send", reinterpret_cast <u_char
*> (buf
), total_len
);
247 delete consolidated_block
;
248 consolidated_block
= nullptr;
254 TAO_GIOP_Message_Base::format_message (TAO_OutputCDR
&stream
, TAO_Stub
*stub
, TAO_ServerRequest
*request
)
256 this->set_giop_flags (stream
);
258 bool log_msg
= TAO_debug_level
> 9;
261 #if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP != 0
262 TAO_ZIOP_Adapter
* ziop_adapter
= this->orb_core_
->ziop_adapter ();
267 this->dump_consolidated_msg (stream
);
270 const bool compressed
=
272 ziop_adapter
->marshal_data (stream
, *stub
) :
273 ziop_adapter
->marshal_data (stream
, *this->orb_core_
, request
);
275 if (log_msg
&& !compressed
)
277 TAOLIB_DEBUG ((LM_DEBUG
,
278 ACE_TEXT ("TAO (%P|%t) - ")
279 ACE_TEXT ("TAO_GIOP_Message_Base::format_message, ")
280 ACE_TEXT ("GIOP message not compressed\n")));
282 // no need to log. If compressed->ZIOP library dumps message
283 // if not compressed (due to failure or policy settings)
284 // message hasn't changed and was allready dumped
285 // prior to compression...
289 #endif /* TAO_HAS_ZIOP */
292 // Length of all buffers.
293 size_t const total_len
= stream
.total_length ();
295 // NOTE: Here would also be a fine place to calculate a digital
296 // signature for the message and place it into a preallocated slot
297 // in the "ServiceContext". Similarly, this is a good spot to
298 // encrypt messages (or just the message bodies) if that's needed in
299 // this particular environment and that isn't handled by the
300 // networking infrastructure (e.g., IPSEC).
302 char *buf
= const_cast <char*> (stream
.buffer ());
304 CORBA::ULong bodylen
= static_cast <CORBA::ULong
>
305 (total_len
- TAO_GIOP_MESSAGE_HEADER_LEN
);
307 #if !defined (ACE_ENABLE_SWAP_ON_WRITE)
308 *(reinterpret_cast <CORBA::ULong
*> (buf
+
309 TAO_GIOP_MESSAGE_SIZE_OFFSET
)) = bodylen
;
311 if (!stream
.do_byte_swap ())
312 *(reinterpret_cast <CORBA::ULong
*>
313 (buf
+ TAO_GIOP_MESSAGE_SIZE_OFFSET
)) = bodylen
;
315 ACE_CDR::swap_4 (reinterpret_cast <char *> (&bodylen
),
316 buf
+ TAO_GIOP_MESSAGE_SIZE_OFFSET
);
317 #endif /* ACE_ENABLE_SWAP_ON_WRITE */
321 this->dump_consolidated_msg (stream
);
328 TAO_GIOP_Message_Base::parse_next_message (TAO_Queued_Data
&qd
,
331 if (qd
.msg_block ()->length () < TAO_GIOP_MESSAGE_HEADER_LEN
)
333 qd
.missing_data (TAO_MISSING_DATA_UNDEFINED
);
335 return 0; /* incomplete header */
339 TAO_GIOP_Message_State state
;
341 if (state
.parse_message_header (*(qd
.msg_block ())) == -1)
346 size_t const message_size
= state
.message_size (); /* Header + Payload */
348 if (message_size
> qd
.msg_block ()->length ())
350 qd
.missing_data (message_size
- qd
.msg_block ()->length ());
357 /* init out-parameters */
359 mesg_length
= message_size
;
361 return 1; /* complete header */
366 TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block
&incoming
,
367 TAO_Queued_Data
*&qd
)
369 if (incoming
.length () < TAO_GIOP_MESSAGE_HEADER_LEN
)
371 if (incoming
.length () > 0)
373 // Optimize memory usage, we dont know actual message size
374 // so far, but allocate enough space to hold small GIOP
375 // messages. This way we avoid expensive "grow" operation
376 // for small messages.
377 size_t const default_buf_size
= ACE_CDR::DEFAULT_BUFSIZE
;
379 // Make a node which has at least message block of the size
380 // of MESSAGE_HEADER_LEN.
381 size_t const buf_size
= ace_max (TAO_GIOP_MESSAGE_HEADER_LEN
,
384 // POST: buf_size >= TAO_GIOP_MESSAGE_HEADER_LEN
386 qd
= this->make_queued_data (buf_size
);
390 if (TAO_debug_level
> 0)
392 TAOLIB_ERROR((LM_ERROR
,
393 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::extract_next_message, ")
394 ACE_TEXT ("out of memory\n")));
399 qd
->msg_block ()->copy (incoming
.rd_ptr (), incoming
.length ());
401 incoming
.rd_ptr (incoming
.length ()); // consume all available data
403 qd
->missing_data (TAO_MISSING_DATA_UNDEFINED
);
407 // handle not initialized variables
408 qd
= nullptr; // reset
414 TAO_GIOP_Message_State state
;
415 if (state
.parse_message_header (incoming
) == -1)
420 size_t copying_len
= state
.message_size ();
422 qd
= this->make_queued_data (copying_len
);
426 if (TAO_debug_level
> 0)
428 TAOLIB_ERROR ((LM_ERROR
,
429 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::extract_next_message, ")
430 ACE_TEXT ("out of memory\n")));
435 if (copying_len
> incoming
.length ())
437 qd
->missing_data (copying_len
- incoming
.length ());
438 copying_len
= incoming
.length ();
442 qd
->missing_data (0);
445 qd
->msg_block ()->copy (incoming
.rd_ptr (), copying_len
);
447 incoming
.rd_ptr (copying_len
);
454 TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data
*qd
,
455 ACE_Message_Block
&incoming
)
457 // Look to see whether we had atleast parsed the GIOP header ...
458 if (qd
->missing_data () == TAO_MISSING_DATA_UNDEFINED
)
460 // The data length that has been stuck in there during the last
462 size_t const len
= qd
->msg_block ()->length ();
465 if (len
>= TAO_GIOP_MESSAGE_HEADER_LEN
)
467 // inconsistency - this code should have parsed the header
472 // We know that we would have space for
473 // TAO_GIOP_MESSAGE_HEADER_LEN here. So copy that much of data
474 // from the <incoming> into the message block in <qd>
475 size_t const available
= incoming
.length ();
476 size_t const desired
= TAO_GIOP_MESSAGE_HEADER_LEN
- len
;
477 size_t const n_copy
= ace_min (available
, desired
);
479 // paranoid check, but would cause endless looping
485 if (qd
->msg_block ()->copy (incoming
.rd_ptr (), n_copy
) == -1)
490 // Move the rd_ptr () in the incoming message block..
491 incoming
.rd_ptr (n_copy
);
493 // verify sufficient data to parse GIOP header
494 if (qd
->msg_block ()->length () < TAO_GIOP_MESSAGE_HEADER_LEN
)
496 return 0; /* continue */
499 TAO_GIOP_Message_State state
;
501 // Parse the message header now...
502 if (state
.parse_message_header (*qd
->msg_block ()) == -1)
504 if (TAO_debug_level
> 0)
506 TAOLIB_ERROR ((LM_ERROR
,
507 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::consolidate_node, ")
508 ACE_TEXT ("error parsing header\n") ));
512 // Now grow the message block so that we can copy the rest of
513 // the data, the message_block must be able to hold complete message
514 if (ACE_CDR::grow (qd
->msg_block (),
515 state
.message_size ()) == -1) /* GIOP_Header + Payload */
517 // on mem-error get rid of context silently, try to avoid
518 // system calls that might allocate additional memory
522 // Copy the pay load..
523 // Calculate the bytes that needs to be copied in the queue...
524 size_t copy_len
= state
.payload_size ();
526 // If the data that needs to be copied is more than that is
527 // available to us ..
528 if (copy_len
> incoming
.length ())
530 // Calculate the missing data..
531 qd
->missing_data (copy_len
- incoming
.length ());
533 // Set the actual possible copy_len that is available...
534 copy_len
= incoming
.length ();
538 qd
->missing_data (0);
541 // ..now we are set to copy the right amount of data to the
543 if (qd
->msg_block ()->copy (incoming
.rd_ptr (), copy_len
) == -1)
548 // Set the <rd_ptr> of the <incoming>..
549 incoming
.rd_ptr (copy_len
);
551 // Get the other details...
556 // @todo: Need to abstract this out to a separate method...
557 size_t copy_len
= qd
->missing_data ();
559 if (copy_len
> incoming
.length ())
561 // Calculate the missing data..
562 qd
->missing_data (copy_len
- incoming
.length ());
564 // Set the actual possible copy_len that is available...
565 copy_len
= incoming
.length ();
568 // paranoid check for endless-event-looping
574 // Copy the right amount of data in to the node...
575 if (qd
->msg_block ()->copy (incoming
.rd_ptr (), copy_len
) == -1)
580 // Set the <rd_ptr> of the <incoming>..
581 qd
->msg_block ()->rd_ptr (copy_len
);
588 TAO_GIOP_Message_Base::process_request_message (TAO_Transport
*transport
,
591 // Set the upcall thread
592 this->orb_core_
->lf_strategy ().set_upcall_thread (this->orb_core_
->leader_follower ());
594 // Get the parser we need to use
595 TAO_GIOP_Message_Generator_Parser
*generator_parser
=
596 this->get_parser (qd
->giop_version ());
598 // A buffer that we will use to initialise the CDR stream. Since we're
599 // allocating the buffer on the stack, we may as well allocate the data
600 // block on the stack too and avoid an allocation inside the message
602 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
603 char repbuf
[ACE_CDR::DEFAULT_BUFSIZE
] = { 0 };
605 char repbuf
[ACE_CDR::DEFAULT_BUFSIZE
];
606 #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */
607 ACE_Data_Block
out_db (sizeof (repbuf
),
608 ACE_Message_Block::MB_DATA
,
610 this->orb_core_
->input_cdr_buffer_allocator (),
612 ACE_Message_Block::DONT_DELETE
,
613 this->orb_core_
->input_cdr_dblock_allocator ());
615 // Initialize an output CDR on the stack
616 // NOTE: Don't jump to a conclusion as to why we are using the
617 // input_cdr and hence the global pool here. These pools will move
618 // to the lanes anyway at some point of time. Further, it would have
619 // been awesome to have this in TSS. But for some reason the cloning
620 // that happens when the ORB gets flow controlled while writing a
621 // reply is messing things up. We crash horribly. Doing this adds a
622 // lock, we need to set things like this -- put stuff in TSS here
623 // and transfer to global memory when we get flow controlled. We
624 // need to work on the message block to get it right!
625 TAO_OutputCDR
output (&out_db
,
626 TAO_ENCAP_BYTE_ORDER
,
627 this->orb_core_
->input_cdr_msgblock_allocator (),
628 this->orb_core_
->orb_params ()->cdr_memcpy_tradeoff (),
629 this->fragmentation_strategy_
,
630 qd
->giop_version ().major_version (),
631 qd
->giop_version ().minor_version ());
633 // Get the read and write positions and header before we steal data.
634 size_t rd_pos
= qd
->msg_block ()->rd_ptr () - qd
->msg_block ()->base ();
635 size_t wr_pos
= qd
->msg_block ()->wr_ptr () - qd
->msg_block ()->base ();
636 rd_pos
+= TAO_GIOP_MESSAGE_HEADER_LEN
;
638 // Create a input CDR stream. We do the following
639 // 1 - If the incoming message block has a data block with a flag
640 // DONT_DELETE (for the data block) we create an input CDR
641 // stream the same way.
642 // 2 - If the incoming message block had a datablock from heap just
643 // use it by duplicating it and make the flag 0.
644 // NOTE: We use the same data block in which we read the message and
645 // we pass it on to the higher layers of the ORB. So we dont to any
646 // copies at all here. The same is also done in the higher layers.
648 ACE_Data_Block
*db
= nullptr;
650 // Get the flag in the message block
651 ACE_Message_Block::Message_Flags flg
= qd
->msg_block ()->self_flags ();
653 if (ACE_BIT_ENABLED (flg
, ACE_Message_Block::DONT_DELETE
))
655 // Use the same datablock
656 db
= qd
->msg_block ()->data_block ();
660 // Use a duplicated datablock as the datablock has come off the
662 db
= qd
->msg_block ()->data_block ()->duplicate ();
664 db
->size (qd
->msg_block ()->length ());
666 #if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1
667 if (qd
->state ().compressed ())
669 if (!this->decompress (&db
, *qd
, rd_pos
, wr_pos
))
671 if (qd
->msg_block ()->data_block () != db
)
673 // qd still owns the original compressed buffer, db now has
674 // the uncompressed data which is on the heap
675 ACE_CLR_BITS (flg
, ACE_Message_Block::DONT_DELETE
);
679 if (TAO_debug_level
> 9)
682 ACE_OS::sprintf (buf
, "Transport[" ACE_SIZE_T_FORMAT_SPECIFIER_ASCII
"] recv",
684 //due to alignment data block has an offset which needs to be corrected
686 reinterpret_cast <u_char
*> (db
->base () + rd_pos
- TAO_GIOP_MESSAGE_HEADER_LEN
),
687 db
->size () + rd_pos
- TAO_GIOP_MESSAGE_HEADER_LEN
);
690 TAO_InputCDR
input_cdr (db
,
695 qd
->giop_version ().major_version (),
696 qd
->giop_version ().minor_version (),
699 transport
->assign_translators(&input_cdr
,&output
);
701 // We know we have some request message. Check whether it is a
702 // GIOP_REQUEST or GIOP_LOCATE_REQUEST to take action.
704 // Once we send the InputCDR stream we need to just forget about
705 // the stream and never touch that again for anything. We basically
706 // loose ownership of the data_block.
708 switch (qd
->msg_type ())
711 // Should be taken care by the state specific invocations. They
712 // could raise an exception or write things in the output CDR
714 return this->process_request (transport
,
719 case GIOP::LocateRequest
:
720 return this->process_locate_request (transport
,
729 #if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1
731 TAO_GIOP_Message_Base::decompress (ACE_Data_Block
**db
, TAO_Queued_Data
& qd
,
732 size_t& rd_pos
, size_t& wr_pos
)
734 TAO_ZIOP_Adapter
* adapter
= this->orb_core_
->ziop_adapter ();
737 if (!adapter
->decompress (db
, qd
, *this->orb_core_
))
739 rd_pos
= TAO_GIOP_MESSAGE_HEADER_LEN
;
740 wr_pos
= (*db
)->size();
744 if (TAO_debug_level
> 0)
745 TAOLIB_ERROR ((LM_ERROR
,
746 ACE_TEXT ("TAO (%P|%t) ERROR: Unable to decompress ")
747 ACE_TEXT ("data (Server is not ZIOP enabled).\n")));
756 TAO_GIOP_Message_Base::process_reply_message (
757 TAO_Pluggable_Reply_Params
¶ms
,
760 // Get the parser we need to use
761 TAO_GIOP_Message_Generator_Parser
*generator_parser
=
762 this->get_parser (qd
->giop_version ());
764 // Get the read and write positions before we steal data.
765 size_t rd_pos
= qd
->msg_block ()->rd_ptr () - qd
->msg_block ()->base ();
766 size_t wr_pos
= qd
->msg_block ()->wr_ptr () - qd
->msg_block ()->base ();
767 rd_pos
+= TAO_GIOP_MESSAGE_HEADER_LEN
;
769 ACE_Data_Block
*db
= nullptr;
771 // Get the flag in the message block
772 ACE_Message_Block::Message_Flags flg
= qd
->msg_block ()->self_flags ();
774 if (ACE_BIT_ENABLED (flg
, ACE_Message_Block::DONT_DELETE
))
776 // Use the same datablock
777 db
= qd
->msg_block ()->data_block ();
781 // Use a duplicated datablock as the datablock has come off the
783 db
= qd
->msg_block ()->data_block ()->duplicate ();
785 db
->size (qd
->msg_block ()->length ());
787 #if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1
788 if (qd
->state ().compressed ())
790 if (!this->decompress (&db
, *qd
, rd_pos
, wr_pos
))
792 if (qd
->msg_block ()->data_block () != db
)
794 // qd still owns the original compressed buffer, db now has
795 // the uncompressed data which is on the heap
796 ACE_CLR_BITS (flg
, ACE_Message_Block::DONT_DELETE
);
800 if (TAO_debug_level
> 9)
803 ACE_OS::sprintf (buf
, "Transport[" ACE_SIZE_T_FORMAT_SPECIFIER_ASCII
"] recv",
804 params
.transport_
->id ());
806 reinterpret_cast <u_char
*> (db
->base () + rd_pos
- TAO_GIOP_MESSAGE_HEADER_LEN
),
807 db
->size () + rd_pos
- TAO_GIOP_MESSAGE_HEADER_LEN
);
810 // Create a empty buffer on stack
811 // NOTE: We use the same data block in which we read the message and
812 // we pass it on to the higher layers of the ORB. So we dont to any
813 // copies at all here.
814 TAO_InputCDR
input_cdr (db
,
819 qd
->giop_version ().major_version (),
820 qd
->giop_version ().minor_version (),
823 // We know we have some reply message. Check whether it is a
824 // GIOP_REPLY or GIOP_LOCATE_REPLY to take action.
826 // Once we send the InputCDR stream we need to just forget about
827 // the stream and never touch that again for anything. We basically
828 // loose ownership of the data_block.
831 switch (qd
->msg_type ())
834 // Should be taken care by the state specific parsing
835 retval
= generator_parser
->parse_reply (input_cdr
, params
);
837 case GIOP::LocateReply
:
838 retval
= generator_parser
->parse_locate_reply (input_cdr
, params
);
847 params
.input_cdr_
= &input_cdr
;
848 params
.transport_
->assign_translators (params
.input_cdr_
, nullptr);
850 retval
= params
.transport_
->tms ()->dispatch_reply (params
);
854 // Something really critical happened, we will forget about
855 // every reply on this connection.
856 if (TAO_debug_level
> 0)
857 TAOLIB_ERROR ((LM_ERROR
,
858 ACE_TEXT ("TAO (%P|%t) - GIOP_Message_Base[%d]::process_reply_message, ")
859 ACE_TEXT ("dispatch reply failed\n"),
860 params
.transport_
->id ()));
866 TAO_GIOP_Message_Base::generate_exception_reply (
868 TAO_Pluggable_Reply_Params_Base
¶ms
,
869 const CORBA::Exception
&x
)
871 // A new try/catch block, but if something goes wrong now we have no
876 // Make the GIOP & reply header.
877 this->generate_reply_header (cdr
, params
);
880 catch (const ::CORBA::BAD_PARAM
&)
884 catch (const ::CORBA::Exception
&)
886 // Now we know that while handling the error an other error
887 // happened -> no hope, close connection.
890 if (TAO_debug_level
> 0)
891 TAOLIB_DEBUG ((LM_DEBUG
,
892 ACE_TEXT ("(%P|%t|%N|%l) cannot marshal exception, ")
893 ACE_TEXT ("generate_exception_reply ()\n")));
901 TAO_GIOP_Message_Base::write_protocol_header (GIOP::MsgType type
,
902 const TAO_GIOP_Message_Version
&version
,
905 // Reset the message type
908 CORBA::Octet header
[12] =
910 // The following works on non-ASCII platforms, such as MVS (which
918 header
[4] = version
.major
;
919 header
[5] = version
.minor
;
921 // "flags" octet, i.e. header[6] will be set up later when message
922 // is formatted by the transport.
924 header
[7] = static_cast <CORBA::Octet
> (type
); // Message type
926 static ACE_CDR::ULong
const header_size
=
927 sizeof (header
) / sizeof (header
[0]);
929 // Fragmentation should not occur at this point since there are only
930 // 12 bytes in the stream, and fragmentation may only occur when
931 // the stream length >= 16.
932 msg
.write_octet_array (header
, header_size
);
934 return msg
.good_bit ();
938 TAO_GIOP_Message_Base::process_request (
939 TAO_Transport
* transport
,
941 TAO_OutputCDR
& output
,
942 TAO_GIOP_Message_Generator_Parser
* parser
)
944 // This will extract the request header, set <response_required>
945 // and <sync_with_server> as appropriate.
946 TAO_ServerRequest
request (this,
952 CORBA::ULong request_id
= 0;
953 CORBA::Boolean response_required
= false;
957 int const parse_error
= parser
->parse_request_header (request
);
959 // Throw an exception if the
960 if (parse_error
!= 0)
961 throw ::CORBA::MARSHAL (0, CORBA::COMPLETED_NO
);
963 TAO_Codeset_Manager
*csm
= request
.orb_core ()->codeset_manager ();
966 csm
->process_service_context (request
);
967 transport
->assign_translators (&cdr
, &output
);
970 request_id
= request
.request_id ();
972 response_required
= request
.response_expected ();
974 CORBA::Object_var forward_to
;
976 // Do this before the reply is sent.
977 this->orb_core_
->request_dispatcher ()->dispatch (
982 if (request
.is_forwarded ())
984 CORBA::Boolean
const permanent_forward_condition
=
985 this->orb_core_
->is_permanent_forward_condition
987 request
.request_service_context ());
989 // We should forward to another object...
990 TAO_Pluggable_Reply_Params_Base reply_params
;
991 reply_params
.request_id_
= request_id
;
992 reply_params
.reply_status (
993 permanent_forward_condition
994 ? GIOP::LOCATION_FORWARD_PERM
995 : GIOP::LOCATION_FORWARD
);
996 reply_params
.svc_ctx_
.length (0);
998 // Send back the reply service context.
999 reply_params
.service_context_notowned (
1000 &request
.reply_service_info ());
1002 output
.message_attributes (request_id
,
1004 TAO_Message_Semantics (TAO_Message_Semantics::TAO_REPLY
),
1007 // Make the GIOP header and Reply header
1008 this->generate_reply_header (output
, reply_params
);
1010 if (!(output
<< forward_to
.in ()))
1012 if (TAO_debug_level
> 0)
1013 TAOLIB_ERROR ((LM_ERROR
,
1014 ACE_TEXT ("TAO (%P|%t) ERROR: Unable to marshal ")
1015 ACE_TEXT ("forward reference.\n")));
1020 output
.more_fragments (false);
1022 int const result
= transport
->send_message (output
,
1025 TAO_Message_Semantics (TAO_Message_Semantics::TAO_REPLY
));
1028 if (TAO_debug_level
> 0)
1030 // No exception but some kind of error, yet a
1031 // response is required.
1032 TAOLIB_ERROR ((LM_ERROR
,
1033 ACE_TEXT ("TAO: (%P|%t|%N|%l) %p: ")
1034 ACE_TEXT ("cannot send reply\n"),
1035 ACE_TEXT ("TAO_GIOP_Message_Base::process_request")));
1041 // Only CORBA exceptions are caught here.
1042 catch ( ::CORBA::Exception
& ex
)
1046 if (response_required
)
1048 result
= this->send_reply_exception (transport
,
1051 &request
.reply_service_info (),
1055 if (TAO_debug_level
> 0)
1057 TAOLIB_ERROR ((LM_ERROR
,
1058 ACE_TEXT ("TAO: (%P|%t|%N|%l) %p: ")
1059 ACE_TEXT ("cannot send exception\n"),
1060 ACE_TEXT ("process_connector_request ()")));
1062 ex
._tao_print_exception (
1063 "TAO_GIOP_Message_Base::process_request[1]");
1068 else if (TAO_debug_level
> 0)
1070 // It is unfortunate that an exception (probably a system
1071 // exception) was thrown by the upcall code (even by the
1072 // user) when the client was not expecting a response.
1073 // However, in this case, we cannot close the connection
1074 // down, since it really isn't the client's fault.
1076 TAOLIB_ERROR ((LM_ERROR
,
1077 ACE_TEXT ("(%P|%t) exception thrown ")
1078 ACE_TEXT ("but client is not waiting a response\n")));
1080 ex
._tao_print_exception (
1081 "TAO_GIOP_Message_Base::process_request[2]");
1088 // @@ TODO some c++ exception or another, but what do we do with
1090 // We are supposed to map it into a CORBA::UNKNOWN exception.
1091 // BTW, this cannot be detected if using the <env> mapping. If
1092 // we have native exceptions but no support for them in the ORB
1093 // we should still be able to catch it. If we don't have native
1094 // exceptions it couldn't have been raised in the first place!
1097 if (response_required
)
1099 CORBA::UNKNOWN
exception (CORBA::SystemException::_tao_minor_code
1100 (TAO_UNHANDLED_SERVER_CXX_EXCEPTION
, 0),
1101 CORBA::COMPLETED_MAYBE
);
1103 if (this->send_reply_exception (transport
,
1106 &request
.reply_service_info (),
1108 && TAO_debug_level
> 0)
1110 TAOLIB_ERROR ((LM_ERROR
,
1111 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_request[3], ")
1113 ACE_TEXT ("cannot send exception\n"),
1114 ACE_TEXT ("process_request ()")));
1115 exception
._tao_print_exception (
1116 "TAO_GIOP_Message_Base::process_request[3]");
1119 else if (TAO_debug_level
> 0)
1121 // It is unfotunate that an exception (probably a system
1122 // exception) was thrown by the upcall code (even by the
1123 // user) when the client was not expecting a response.
1124 // However, in this case, we cannot close the connection
1125 // down, since it really isn't the client's fault.
1126 TAOLIB_ERROR ((LM_ERROR
,
1127 ACE_TEXT ("TAO (%P|%t) exception thrown ")
1128 ACE_TEXT ("but client is not waiting a response\n")));
1139 TAO_GIOP_Message_Base::process_locate_request (TAO_Transport
*transport
,
1140 TAO_InputCDR
&input
,
1141 TAO_OutputCDR
&output
,
1142 TAO_GIOP_Message_Generator_Parser
*parser
)
1144 // This will extract the request header, set <response_required> as
1146 TAO_GIOP_Locate_Request_Header
locate_request (input
, this->orb_core_
);
1148 TAO_GIOP_Locate_Status_Msg status_info
;
1151 status_info
.status
= GIOP::UNKNOWN_OBJECT
;
1153 CORBA::Boolean response_required
= true;
1157 int parse_error
= parser
->parse_locate_header (locate_request
);
1159 if (parse_error
!= 0)
1161 throw ::CORBA::MARSHAL (0, CORBA::COMPLETED_NO
);
1164 TAO::ObjectKey
tmp_key (locate_request
.object_key ().length (),
1165 locate_request
.object_key ().length (),
1166 locate_request
.object_key ().get_buffer (),
1169 // Set it to an error state
1171 CORBA::ULong req_id
= locate_request
.request_id ();
1173 // We will send the reply. The ServerRequest class need not send
1175 CORBA::Boolean deferred_reply
= true;
1176 TAO_ServerRequest
server_request (this,
1187 if (parse_error
!= 0)
1189 throw ::CORBA::MARSHAL (0, CORBA::COMPLETED_NO
);
1192 CORBA::Object_var forward_to
;
1194 this->orb_core_
->request_dispatcher ()->dispatch (
1199 if (server_request
.is_forwarded ())
1201 status_info
.status
= GIOP::OBJECT_FORWARD
;
1202 status_info
.forward_location_var
= forward_to
;
1203 if (TAO_debug_level
> 0)
1204 TAOLIB_DEBUG ((LM_DEBUG
,
1205 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
1206 ACE_TEXT ("called: forwarding\n")));
1208 else if (server_request
.reply_status () == GIOP::NO_EXCEPTION
)
1210 // We got no exception, so the object is here.
1211 status_info
.status
= GIOP::OBJECT_HERE
;
1212 if (TAO_debug_level
> 0)
1213 TAOLIB_DEBUG ((LM_DEBUG
,
1214 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
1215 ACE_TEXT ("found\n")));
1219 // Normal exception, so the object is not here
1220 status_info
.status
= GIOP::UNKNOWN_OBJECT
;
1221 TAOLIB_DEBUG ((LM_DEBUG
,
1222 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
1223 ACE_TEXT ("not here\n")));
1227 catch (const ::CORBA::Exception
&)
1229 // Normal exception, so the object is not here
1230 status_info
.status
= GIOP::UNKNOWN_OBJECT
;
1231 if (TAO_debug_level
> 0)
1232 TAOLIB_DEBUG ((LM_DEBUG
,
1233 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
1234 ACE_TEXT ("CORBA exception raised\n")));
1238 // Normal exception, so the object is not here
1239 status_info
.status
= GIOP::UNKNOWN_OBJECT
;
1240 if (TAO_debug_level
> 0)
1241 TAOLIB_DEBUG ((LM_DEBUG
,
1242 ACE_TEXT ("TAO (%P|%t) TAO_GIOP_Message_Base::process_locate_request - ")
1243 ACE_TEXT ("C++ exception raised\n")));
1246 return this->make_send_locate_reply (transport
,
1254 TAO_GIOP_Message_Base::make_send_locate_reply (TAO_Transport
*transport
,
1255 TAO_GIOP_Locate_Request_Header
&request
,
1256 TAO_GIOP_Locate_Status_Msg
&status_info
,
1257 TAO_OutputCDR
&output
,
1258 TAO_GIOP_Message_Generator_Parser
*parser
)
1260 TAO_GIOP_Message_Version giop_version
;
1261 output
.get_version (giop_version
);
1263 // Note here we are making the Locate reply header which is *QUITE*
1264 // different from the reply header made by the make_reply () call..
1265 // Make the GIOP message header
1266 this->write_protocol_header (GIOP::LocateReply
, giop_version
, output
);
1268 // This writes the header & body
1269 parser
->write_locate_reply_mesg (output
,
1270 request
.request_id (),
1273 output
.more_fragments (false);
1276 int const result
= transport
->send_message (output
,
1279 TAO_Message_Semantics (TAO_Message_Semantics::TAO_REPLY
));
1281 // Print out message if there is an error
1284 if (TAO_debug_level
> 0)
1286 TAOLIB_ERROR ((LM_ERROR
,
1287 ACE_TEXT ("TAO: (%P|%t) %p: cannot send reply\n"),
1288 ACE_TEXT ("TAO_GIOP_Message_Base::make_send_locate_reply")));
1295 // Send an "I can't understand you" message -- again, the message is
1296 // prefabricated for simplicity. This implies abortive disconnect (at
1297 // the application level, if not at the level of TCP).
1299 // NOTE that IIOP will still benefit from TCP's orderly disconnect.
1301 TAO_GIOP_Message_Base::send_error (TAO_Transport
*transport
)
1303 const char error_message
[TAO_GIOP_MESSAGE_HEADER_LEN
] =
1305 // The following works on non-ASCII platforms, such as MVS (which
1311 (CORBA::Octet
) 1, // Use the lowest GIOP version
1313 TAO_ENCAP_BYTE_ORDER
,
1318 if (TAO_debug_level
> 9)
1320 this->dump_msg ("send_error",
1321 reinterpret_cast <const u_char
*> (error_message
),
1322 TAO_GIOP_MESSAGE_HEADER_LEN
);
1325 ACE_Data_Block
data_block (TAO_GIOP_MESSAGE_HEADER_LEN
,
1326 ACE_Message_Block::MB_DATA
,
1330 ACE_Message_Block::DONT_DELETE
,
1332 ACE_Message_Block
message_block(&data_block
,
1333 ACE_Message_Block::DONT_DELETE
);
1334 message_block
.wr_ptr (TAO_GIOP_MESSAGE_HEADER_LEN
);
1337 int const result
= transport
->send_message_block_chain (&message_block
, bt
);
1340 if (TAO_debug_level
> 0)
1341 TAOLIB_DEBUG ((LM_DEBUG
,
1342 ACE_TEXT ("TAO (%N|%l|%P|%t) error sending error to transport %u\n"),
1349 TAO_GIOP_Message_Generator_Parser
*
1350 TAO_GIOP_Message_Base::get_parser (
1351 const TAO_GIOP_Message_Version
&version
) const
1353 switch (version
.major
)
1356 switch (version
.minor
)
1360 const_cast<TAO_GIOP_Message_Generator_Parser_10
*> (
1361 &this->tao_giop_impl_
.tao_giop_10
);
1365 const_cast<TAO_GIOP_Message_Generator_Parser_11
*> (
1366 &this->tao_giop_impl_
.tao_giop_11
);
1370 const_cast<TAO_GIOP_Message_Generator_Parser_12
*> (
1371 &this->tao_giop_impl_
.tao_giop_12
);
1374 throw ::CORBA::INTERNAL (0, CORBA::COMPLETED_NO
);
1379 throw ::CORBA::INTERNAL (0, CORBA::COMPLETED_NO
);
1384 // Server sends an "I'm shutting down now, any requests you've sent me
1385 // can be retried" message to the server. The message is prefab, for
1388 // NOTE: this is IIOP-specific though it doesn't look like it is. It
1389 // relies on a TCP-ism: orderly disconnect, which doesn't exist in all
1390 // transport protocols. Versions of GIOP atop some transport that's
1391 // lacking orderly disconnect must define some transport-specific
1392 // handshaking (e.g. the XNS/SPP handshake convention) in order to
1393 // know that the same transport semantics are provided when shutdown
1394 // is begun with messages "in flight". (IIOP doesn't report false
1395 // errors in the case of "clean shutdown", because it relies on
1396 // orderly disconnect as provided by TCP. This quality of service is
1397 // required to write robust distributed systems.)
1399 TAO_GIOP_Message_Base::
1400 send_close_connection (const TAO_GIOP_Message_Version
&version
,
1401 TAO_Transport
*transport
)
1403 // static CORBA::Octet
1404 // I hate this in every method. Till the time I figure out a way
1405 // around I will have them here hanging around.
1406 const char close_message
[TAO_GIOP_MESSAGE_HEADER_LEN
] =
1408 // The following works on non-ASCII platforms, such as MVS (which
1414 static_cast<char> (version
.major
),
1415 static_cast<char> (version
.minor
),
1416 TAO_ENCAP_BYTE_ORDER
,
1417 GIOP::CloseConnection
,
1421 // It's important that we use a reliable shutdown after we send this
1422 // message, so we know it's received.
1424 // @@ should recv and discard queued data for portability; note
1425 // that this won't block (long) since we never set SO_LINGER
1426 if (TAO_debug_level
> 9)
1428 this->dump_msg ("send_close_connection",
1429 reinterpret_cast <const u_char
*> (close_message
),
1430 TAO_GIOP_MESSAGE_HEADER_LEN
);
1433 ACE_Data_Block
data_block (TAO_GIOP_MESSAGE_HEADER_LEN
,
1434 ACE_Message_Block::MB_DATA
,
1438 ACE_Message_Block::DONT_DELETE
,
1440 ACE_Message_Block
message_block(&data_block
);
1441 message_block
.wr_ptr (TAO_GIOP_MESSAGE_HEADER_LEN
);
1444 int const result
= transport
->send_message_block_chain (&message_block
, bt
);
1447 if (TAO_debug_level
> 0)
1448 TAOLIB_ERROR ((LM_ERROR
,
1449 ACE_TEXT ("(%P|%t) error closing connection %u, errno = %d\n"),
1450 transport
->id (), ACE_ERRNO_GET
));
1453 transport
->close_connection ();
1454 TAOLIB_DEBUG ((LM_DEBUG
,
1455 ACE_TEXT ("(%P|%t) shut down transport, handle %d\n"),
1456 transport
-> id ()));
1460 TAO_GIOP_Message_Base::send_reply_exception (
1461 TAO_Transport
*transport
,
1462 TAO_OutputCDR
&output
,
1463 CORBA::ULong request_id
,
1464 IOP::ServiceContextList
*svc_info
,
1468 TAO_Pluggable_Reply_Params_Base reply_params
;
1469 reply_params
.request_id_
= request_id
;
1470 reply_params
.svc_ctx_
.length (0);
1472 // We are going to send some data
1473 reply_params
.argument_flag_
= true;
1475 // Send back the service context we received. (RTCORBA relies on
1477 reply_params
.service_context_notowned (svc_info
);
1479 if (CORBA::SystemException::_downcast (x
) != nullptr)
1481 reply_params
.reply_status (GIOP::SYSTEM_EXCEPTION
);
1485 reply_params
.reply_status (GIOP::USER_EXCEPTION
);
1488 if (this->generate_exception_reply (output
, reply_params
, *x
) == -1)
1491 output
.more_fragments (false);
1493 return transport
->send_message (output
, nullptr, nullptr, TAO_Message_Semantics (TAO_Message_Semantics::TAO_REPLY
));
1497 TAO_GIOP_Message_Base::dump_msg (const char *label
,
1501 if (TAO_debug_level
< 10)
1506 static const char digits
[] = "0123456789ABCD";
1507 static const char *names
[] =
1520 const char *message_name
= "UNKNOWN MESSAGE";
1521 u_long slot
= ptr
[TAO_GIOP_MESSAGE_TYPE_OFFSET
];
1522 if (slot
< sizeof (names
) / sizeof (names
[0]))
1523 message_name
= names
[slot
];
1526 int const byte_order
= ptr
[TAO_GIOP_MESSAGE_FLAGS_OFFSET
] & 0x01;
1528 // Get the version info
1529 CORBA::Octet
const major
= ptr
[TAO_GIOP_VERSION_MAJOR_OFFSET
];
1530 CORBA::Octet
const minor
= ptr
[TAO_GIOP_VERSION_MINOR_OFFSET
];
1532 // request/reply id.
1533 CORBA::ULong tmp
= 0;
1534 CORBA::ULong
*id
= std::addressof(tmp
);
1535 char *tmp_id
= nullptr;
1537 if (ptr
[TAO_GIOP_MESSAGE_TYPE_OFFSET
] == GIOP::Request
||
1538 ptr
[TAO_GIOP_MESSAGE_TYPE_OFFSET
] == GIOP::Reply
||
1539 ptr
[TAO_GIOP_MESSAGE_TYPE_OFFSET
] == GIOP::Fragment
)
1541 if (major
== 1 && minor
< 2)
1543 // @@ Only works if ServiceContextList is empty....
1544 tmp_id
= (char * ) (ptr
+ TAO_GIOP_MESSAGE_HEADER_LEN
+ 4);
1548 tmp_id
= (char * ) (ptr
+ TAO_GIOP_MESSAGE_HEADER_LEN
);
1550 #if !defined (ACE_DISABLE_SWAP_ON_READ)
1551 if (byte_order
== TAO_ENCAP_BYTE_ORDER
)
1553 id
= reinterpret_cast <ACE_CDR::ULong
*> (tmp_id
);
1557 ACE_CDR::swap_4 (tmp_id
, reinterpret_cast <char*> (id
));
1560 id
= reinterpret_cast <ACE_CDR::ULong
*> (tmp_id
);
1561 #endif /* ACE_DISABLE_SWAP_ON_READ */
1564 else if (ptr
[TAO_GIOP_MESSAGE_TYPE_OFFSET
] == GIOP::CancelRequest
||
1565 ptr
[TAO_GIOP_MESSAGE_TYPE_OFFSET
] == GIOP::LocateRequest
||
1566 ptr
[TAO_GIOP_MESSAGE_TYPE_OFFSET
] == GIOP::LocateReply
)
1568 tmp_id
= (char * ) (ptr
+ TAO_GIOP_MESSAGE_HEADER_LEN
);
1569 #if !defined (ACE_DISABLE_SWAP_ON_READ)
1570 if (byte_order
== TAO_ENCAP_BYTE_ORDER
)
1572 id
= reinterpret_cast <ACE_CDR::ULong
*> (tmp_id
);
1576 ACE_CDR::swap_4 (tmp_id
, reinterpret_cast <char*> (id
));
1579 id
= reinterpret_cast <ACE_CDR::ULong
*> (tmp_id
);
1580 #endif /* ACE_DISABLE_SWAP_ON_READ */
1584 TAOLIB_DEBUG ((LM_DEBUG
,
1585 ACE_TEXT("TAO (%P|%t) - GIOP_Message_Base::dump_msg, ")
1586 ACE_TEXT("%C GIOP message v%c.%c, %d data bytes, %s endian, ")
1587 ACE_TEXT("Type %C[%u]\n"),
1589 digits
[ptr
[TAO_GIOP_VERSION_MAJOR_OFFSET
]],
1590 digits
[ptr
[TAO_GIOP_VERSION_MINOR_OFFSET
]],
1591 len
- TAO_GIOP_MESSAGE_HEADER_LEN
,
1592 (byte_order
== TAO_ENCAP_BYTE_ORDER
) ? ACE_TEXT("my") : ACE_TEXT("other"),
1595 TAOLIB_HEX_DUMP ((LM_DEBUG
,
1598 ACE_TEXT ("GIOP message")));
1602 TAO_GIOP_Message_Base::generate_locate_reply_header (
1603 TAO_OutputCDR
& /*cdr*/,
1604 TAO_Pluggable_Reply_Params_Base
& /*params*/)
1610 TAO_GIOP_Message_Base::is_ready_for_bidirectional (TAO_OutputCDR
&msg
) const
1612 TAO_GIOP_Message_Version giop_version
;
1614 msg
.get_version (giop_version
);
1616 // Get the parser we need to use
1617 TAO_GIOP_Message_Generator_Parser
*generator_parser
= this->get_parser (giop_version
);
1619 // We dont really know.. So ask the generator and parser objects that
1621 // @@ TODO: Need to make this faster, instead of making virtual
1622 // call, try todo the check within this class
1623 return generator_parser
->is_ready_for_bidirectional ();
1627 TAO_GIOP_Message_Base::make_queued_data (size_t sz
)
1629 // Make a datablock for the size requested + something. The
1630 // "something" is required because we are going to align the data
1631 // block in the message block. During alignment we could loose some
1632 // bytes. As we may not know how many bytes will be lost, we will
1633 // allocate ACE_CDR::MAX_ALIGNMENT extra.
1634 ACE_Data_Block
*db
=
1635 this->orb_core_
->create_input_cdr_data_block (sz
+
1636 ACE_CDR::MAX_ALIGNMENT
);
1638 TAO_Queued_Data
*qd
=
1639 TAO_Queued_Data::make_queued_data (
1640 this->orb_core_
->transport_message_buffer_allocator (),
1641 this->orb_core_
->input_cdr_msgblock_allocator (),
1646 if (TAO_debug_level
> 0)
1648 TAOLIB_ERROR ((LM_ERROR
,
1649 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::make_queued_data, ")
1650 ACE_TEXT ("out of memory, failed to allocate queued data object\n")));
1653 return nullptr; // NULL pointer
1660 TAO_GIOP_Message_Base::header_length () const
1662 return TAO_GIOP_MESSAGE_HEADER_LEN
;
1666 TAO_GIOP_Message_Base::fragment_header_length (
1667 const TAO_GIOP_Message_Version
& giop_version
) const
1669 // Get the parser we need to use
1670 TAO_GIOP_Message_Generator_Parser
*generator_parser
=
1671 this->get_parser (giop_version
);
1673 return generator_parser
->fragment_header_length ();
1677 TAO_GIOP_Message_Base::parse_request_id (const TAO_Queued_Data
*qd
,
1678 CORBA::ULong
&request_id
) const
1680 // Get the read and write positions before we steal data.
1681 size_t rd_pos
= qd
->msg_block ()->rd_ptr () - qd
->msg_block ()->base ();
1682 size_t wr_pos
= qd
->msg_block ()->wr_ptr () - qd
->msg_block ()->base ();
1683 rd_pos
+= TAO_GIOP_MESSAGE_HEADER_LEN
;
1685 // Create a input CDR stream. We do the following
1686 // 1 - If the incoming message block has a data block with a flag
1687 // DONT_DELETE (for the data block) we create an input CDR
1688 // stream the same way.
1689 // 2 - If the incoming message block had a datablock from heap just
1690 // use it by duplicating it and make the flag 0.
1691 // NOTE: We use the same data block in which we read the message and
1692 // we pass it on to the higher layers of the ORB. So we dont to any
1693 // copies at all here. The same is also done in the higher layers.
1695 ACE_Message_Block::Message_Flags flg
= 0;
1696 ACE_Data_Block
*db
= nullptr;
1698 // Get the flag in the message block
1699 flg
= qd
->msg_block ()->self_flags ();
1701 if (ACE_BIT_ENABLED (flg
, ACE_Message_Block::DONT_DELETE
))
1703 // Use the same datablock
1704 db
= qd
->msg_block ()->data_block ();
1708 // Use a duplicated datablock as the datablock has come off the
1710 db
= qd
->msg_block ()->data_block ()->duplicate ();
1713 TAO_InputCDR
input_cdr (db
,
1718 qd
->giop_version ().major_version (),
1719 qd
->giop_version ().minor_version (),
1722 if (qd
->giop_version ().major
== 1 &&
1723 (qd
->giop_version ().minor
== 0 || qd
->giop_version ().minor
== 1))
1725 switch (qd
->msg_type ())
1730 IOP::ServiceContextList service_context
;
1732 if ((input_cdr
>> service_context
)
1733 && (input_cdr
>> request_id
))
1739 case GIOP::CancelRequest
:
1740 case GIOP::LocateRequest
:
1741 case GIOP::LocateReply
:
1743 if ((input_cdr
>> request_id
))
1755 switch (qd
->msg_type ())
1759 case GIOP::Fragment
:
1760 case GIOP::CancelRequest
:
1761 case GIOP::LocateRequest
:
1762 case GIOP::LocateReply
:
1764 // Dealing with GIOP-1.2, the request-id is located directly
1765 // behind the GIOP-Header. This is true for all message
1766 // types that might be sent in form of fragments or
1768 if ((input_cdr
>> request_id
))
1782 /* @return -1 error, 0 ok, +1 outstanding fragments */
1784 TAO_GIOP_Message_Base::consolidate_fragmented_message (
1785 TAO_Queued_Data
* qd
,
1786 TAO_Queued_Data
*& msg
)
1788 TAO::Incoming_Message_Stack reverse_stack
;
1790 TAO_Queued_Data
*tail
= nullptr;
1791 TAO_Queued_Data
*head
= nullptr;
1794 // CONSOLIDATE FRAGMENTED MESSAGE
1797 // check for error-condition
1803 if (qd
->giop_version ().major
== 1 && qd
->giop_version ().minor
== 0)
1805 TAO_Queued_Data::release (qd
);
1806 return -1; // error: GIOP-1.0 does not support fragments
1809 // If this is not the last fragment, push it onto stack for later processing
1810 if (qd
->more_fragments ())
1812 this->fragment_stack_
.push (qd
);
1814 msg
= nullptr; // no consolidated message available yet
1815 return 1; // status: more messages expected.
1820 // Add the current message block to the end of the chain
1821 // after adjusting the read pointer to skip the header(s)
1822 size_t const header_adjustment
=
1823 this->header_length () +
1824 this->fragment_header_length (tail
->giop_version ().major_version ());
1826 if (tail
->msg_block ()->length () < header_adjustment
)
1828 // buffer length not sufficient
1829 TAO_Queued_Data::release (qd
);
1833 // duplicate code to speed up both processes, for GIOP-1.1 and GIOP-1.2
1834 if (tail
->giop_version ().major_version () == 1 && tail
->giop_version ().minor_version () == 1)
1838 while (this->fragment_stack_
.pop (head
) != -1)
1840 if (head
->more_fragments () &&
1841 head
->giop_version ().major_version () == 1 &&
1842 head
->giop_version ().minor_version () == 1 &&
1843 head
->msg_block ()->length () >= header_adjustment
)
1845 // adjust the read-pointer, skip the fragment header
1846 tail
->msg_block ()->rd_ptr(header_adjustment
);
1848 head
->msg_block ()->cont (tail
->msg_block ());
1850 tail
->msg_block (nullptr);
1852 TAO_Queued_Data::release (tail
);
1858 reverse_stack
.push (head
);
1866 CORBA::ULong tmp_request_id
= 0;
1867 if (this->parse_request_id (tail
, tmp_request_id
) == -1)
1872 CORBA::ULong
const request_id
= tmp_request_id
;
1874 while (this->fragment_stack_
.pop (head
) != -1)
1876 CORBA::ULong head_request_id
= 0;
1877 int parse_status
= 0;
1879 if (head
->more_fragments () &&
1880 head
->giop_version ().major_version () >= 1 &&
1881 head
->giop_version ().minor_version () >= 2 &&
1882 head
->msg_block ()->length () >= header_adjustment
&&
1883 (parse_status
= this->parse_request_id (head
, head_request_id
)) != -1 &&
1884 request_id
== head_request_id
)
1886 // adjust the read-pointer, skip the fragment header
1887 tail
->msg_block ()->rd_ptr(header_adjustment
);
1889 head
->msg_block ()->cont (tail
->msg_block ());
1891 tail
->msg_block (nullptr);
1893 TAO_Queued_Data::release (tail
);
1899 if (parse_status
== -1)
1901 TAO_Queued_Data::release (head
);
1905 reverse_stack
.push (head
);
1911 while (reverse_stack
.pop (head
) != -1)
1913 this->fragment_stack_
.push (head
);
1916 if (tail
->consolidate () == -1)
1918 // memory allocation failed
1919 TAO_Queued_Data::release (tail
);
1930 TAO_GIOP_Message_Base::discard_fragmented_message (const TAO_Queued_Data
*cancel_request
)
1932 // We must extract the specific request-id from message-buffer
1933 // and remove all fragments from stack that match this request-id.
1935 TAO::Incoming_Message_Stack reverse_stack
;
1937 CORBA::ULong cancel_request_id
;
1939 if (this->parse_request_id (cancel_request
, cancel_request_id
) == -1)
1944 TAO_Queued_Data
*head
= nullptr;
1947 while (this->fragment_stack_
.pop (head
) != -1)
1949 reverse_stack
.push (head
);
1952 bool discard_all_GIOP11_messages
= false;
1954 // Now we are able to process message in order they have arrived.
1955 // If the cancel_request_id matches to GIOP-1.1 message, all succeeding
1956 // fragments belong to this message and must be discarded.
1957 // Note: GIOP-1.1 fragment header dont have any request-id encoded. If the
1958 // cancel_request_id matches GIOP-1.2 messages, all GIOP-1.2 fragments
1959 // having encoded the request id will be discarded.
1960 while (reverse_stack
.pop (head
) != -1)
1962 CORBA::ULong head_request_id
;
1964 if (head
->giop_version ().major_version () == 1 &&
1965 head
->giop_version ().minor_version () <= 1 &&
1966 head
->msg_type () != GIOP::Fragment
&& // GIOP11 fragment does not provide request id
1967 this->parse_request_id (head
, head_request_id
) >= 0 &&
1968 cancel_request_id
== head_request_id
)
1970 TAO_Queued_Data::release (head
);
1971 discard_all_GIOP11_messages
= true;
1973 else if (head
->giop_version ().major_version () == 1 &&
1974 head
->giop_version ().minor_version () <= 1 &&
1975 discard_all_GIOP11_messages
)
1977 TAO_Queued_Data::release (head
);
1979 else if (head
->giop_version ().major_version () >= 1 &&
1980 head
->giop_version ().minor_version () >= 2 &&
1981 this->parse_request_id (head
, head_request_id
) >= 0 &&
1982 cancel_request_id
== head_request_id
)
1984 TAO_Queued_Data::release (head
);
1988 this->fragment_stack_
.push (head
);
1995 TAO_GIOP_Fragmentation_Strategy
*
1996 TAO_GIOP_Message_Base::fragmentation_strategy ()
1998 return this->fragmentation_strategy_
;
2002 TAO_GIOP_Message_Base::set_giop_flags (TAO_OutputCDR
& msg
) const
2004 CORBA::Octet
* const buf
=
2005 reinterpret_cast<CORBA::Octet
*> (const_cast<char *> (msg
.buffer ()));
2007 CORBA::Octet
const & major
= buf
[TAO_GIOP_VERSION_MAJOR_OFFSET
];
2008 CORBA::Octet
const & minor
= buf
[TAO_GIOP_VERSION_MINOR_OFFSET
];
2010 // Flags for the GIOP protocol header "flags" field.
2011 CORBA::Octet
& flags
= buf
[TAO_GIOP_MESSAGE_FLAGS_OFFSET
];
2013 // Least significant bit: Byte order
2014 ACE_SET_BITS (flags
, TAO_ENCAP_BYTE_ORDER
^ msg
.do_byte_swap ());
2016 // Second least significant bit: More fragments
2018 // Only supported in GIOP 1.1 or better.
2019 if (!(major
<= 1 && minor
== 0))
2020 ACE_SET_BITS (flags
, msg
.more_fragments () << 1);
2023 TAO_END_VERSIONED_NAMESPACE_DECL