Merge pull request #2218 from jwillemsen/jwi-pthreadsigmask
[ACE_TAO.git] / TAO / tao / GIOP_Message_Base.cpp
blob5b8b99b641d5b173aa477c49b51f9ed9ea625da5
1 // -*- C++ -*-
2 #include "tao/GIOP_Message_Base.h"
3 #include "tao/operation_details.h"
4 #include "tao/debug.h"
5 #include "tao/ORB_Core.h"
6 #include "tao/TAO_Server_Request.h"
7 #include "tao/GIOP_Message_Locate_Header.h"
8 #include "tao/Transport.h"
9 #include "tao/Transport_Mux_Strategy.h"
10 #include "tao/LF_Strategy.h"
11 #include "tao/Request_Dispatcher.h"
12 #include "tao/Codeset_Manager.h"
13 #include "tao/SystemException.h"
14 #include "tao/ZIOP_Adapter.h"
15 #include "ace/Min_Max.h"
17 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
19 TAO_GIOP_Message_Base::TAO_GIOP_Message_Base (TAO_ORB_Core *orb_core,
20 TAO_Transport *transport,
21 size_t input_cdr_size)
22 : orb_core_ (orb_core)
23 , fragmentation_strategy_ (orb_core->fragmentation_strategy (transport))
24 , out_stream_ (nullptr,
25 input_cdr_size,
26 TAO_ENCAP_BYTE_ORDER,
27 orb_core->output_cdr_buffer_allocator (),
28 orb_core->output_cdr_dblock_allocator (),
29 orb_core->output_cdr_msgblock_allocator (),
30 orb_core->orb_params ()->cdr_memcpy_tradeoff (),
31 fragmentation_strategy_,
32 TAO_DEF_GIOP_MAJOR,
33 TAO_DEF_GIOP_MINOR)
35 #if defined (TAO_HAS_MONITOR_POINTS) && (TAO_HAS_MONITOR_POINTS == 1)
36 const int nibbles = 2 * sizeof (size_t);
37 char hex_string[nibbles + 1];
38 ACE_OS::sprintf (hex_string,
39 "%8.8X",
40 transport->id ());
41 hex_string[nibbles] = '\0';
42 ACE_CString monitor_name ("OutputCDR_");
43 monitor_name += hex_string;
44 this->out_stream_.register_monitor (monitor_name.c_str ());
45 #endif /* TAO_HAS_MONITOR_POINTS==1 */
49 TAO_GIOP_Message_Base::~TAO_GIOP_Message_Base ()
51 #if defined (TAO_HAS_MONITOR_POINTS) && (TAO_HAS_MONITOR_POINTS == 1)
52 this->out_stream_.unregister_monitor ();
53 #endif /* TAO_HAS_MONITOR_POINTS==1 */
54 delete fragmentation_strategy_;
57 void
58 TAO_GIOP_Message_Base::init (CORBA::Octet major, CORBA::Octet minor)
60 // Set the giop version of the out stream
61 this->out_stream_.set_version (major, minor);
64 TAO_OutputCDR &
65 TAO_GIOP_Message_Base::out_stream ()
67 return this->out_stream_;
70 int
71 TAO_GIOP_Message_Base::generate_request_header (
72 TAO_Operation_Details &op,
73 TAO_Target_Specification &spec,
74 TAO_OutputCDR &cdr)
76 // Get a parser for us
77 TAO_GIOP_Message_Version giop_version;
79 cdr.get_version (giop_version);
81 // Write the GIOP header first
82 if (!this->write_protocol_header (GIOP::Request, giop_version, cdr))
84 if (TAO_debug_level)
86 TAOLIB_ERROR ((LM_ERROR,
87 ACE_TEXT ("(%P|%t) Error in writing GIOP header\n")));
90 return -1;
93 // Get the parser we need to use
94 TAO_GIOP_Message_Generator_Parser *generator_parser =
95 this->get_parser (giop_version);
97 // Now call the implementation for the rest of the header
98 if (!generator_parser->write_request_header (op, spec, cdr))
100 if (TAO_debug_level)
101 TAOLIB_ERROR ((LM_ERROR,
102 ACE_TEXT ("(%P|%t) Error in writing request header\n")));
104 return -1;
107 return 0;
111 TAO_GIOP_Message_Base::generate_locate_request_header (
112 TAO_Operation_Details &op,
113 TAO_Target_Specification &spec,
114 TAO_OutputCDR &cdr)
116 TAO_GIOP_Message_Version giop_version;
118 cdr.get_version (giop_version);
120 // Get the parser we need to use
121 TAO_GIOP_Message_Generator_Parser *generator_parser =
122 this->get_parser (giop_version);
124 // Write the GIOP header first
125 if (!this->write_protocol_header (GIOP::LocateRequest, giop_version, cdr))
127 if (TAO_debug_level)
128 TAOLIB_ERROR ((LM_ERROR,
129 ACE_TEXT ("(%P|%t) Error in writing GIOP header\n")));
131 return -1;
134 // Now call the implementation for the rest of the header
135 if (!generator_parser->write_locate_request_header
136 (op.request_id (), spec, cdr))
138 if (TAO_debug_level)
139 TAOLIB_ERROR ((LM_ERROR,
140 ACE_TEXT ("(%P|%t) Error in writing locate request header\n")));
143 return -1;
146 return 0;
150 TAO_GIOP_Message_Base::generate_reply_header (
151 TAO_OutputCDR &cdr,
152 TAO_Pluggable_Reply_Params_Base &params)
154 TAO_GIOP_Message_Version giop_version;
156 cdr.get_version (giop_version);
158 // Write the GIOP header first
159 if (!this->write_protocol_header (GIOP::Reply, giop_version, cdr))
161 if (TAO_debug_level)
162 TAOLIB_ERROR ((LM_ERROR,
163 ACE_TEXT ("(%P|%t) Error in writing GIOP header\n")));
165 return -1;
170 // Get the parser we need to use
171 TAO_GIOP_Message_Generator_Parser *generator_parser =
172 this->get_parser (giop_version);
174 // Now call the implementation for the rest of the header
175 if (!generator_parser->write_reply_header (cdr, params))
177 if (TAO_debug_level > 4)
178 TAOLIB_ERROR ((LM_ERROR,
179 ACE_TEXT ("(%P|%t) Error in writing reply ")
180 ACE_TEXT ("header\n")));
182 return -1;
185 catch (const ::CORBA::Exception& ex)
187 if (TAO_debug_level > 4)
188 ex._tao_print_exception (
189 ACE_TEXT ("TAO_GIOP_Message_Base::generate_reply_header"));
191 return -1;
194 return 0;
198 TAO_GIOP_Message_Base::generate_fragment_header (TAO_OutputCDR & cdr,
199 CORBA::ULong request_id)
201 TAO_GIOP_Message_Version giop_version;
203 cdr.get_version (giop_version);
205 // GIOP fragments are supported in GIOP 1.1 and better, but TAO only
206 // supports them in 1.2 or better since GIOP 1.1 fragments do not
207 // have a fragment message header.
208 if (giop_version.major == 1 && giop_version.minor < 2)
209 return -1;
211 // Get the parser we need to use
212 TAO_GIOP_Message_Generator_Parser *generator_parser =
213 this->get_parser (giop_version);
215 // Write the GIOP header first
216 if (!this->write_protocol_header (GIOP::Fragment, giop_version, cdr)
217 || !generator_parser->write_fragment_header (cdr, request_id))
219 if (TAO_debug_level)
220 TAOLIB_ERROR ((LM_ERROR,
221 ACE_TEXT ("(%P|%t) Error in writing GIOP header\n")));
223 return -1;
226 return 0;
230 TAO_GIOP_Message_Base::dump_consolidated_msg (TAO_OutputCDR &stream)
232 // Check whether the output cdr stream is build up of multiple
233 // messageblocks. If so, consolidate them to one block that can be
234 // dumped
235 ACE_Message_Block* consolidated_block = nullptr;
236 char *buf = const_cast <char*> (stream.buffer ());
237 size_t const total_len = stream.total_length ();
238 if (stream.begin()->cont () != nullptr)
240 ACE_NEW_RETURN (consolidated_block, ACE_Message_Block, 0);
241 ACE_CDR::consolidate (consolidated_block, stream.begin ());
242 buf = (char *) (consolidated_block->rd_ptr ());
245 this->dump_msg ("send", reinterpret_cast <u_char *> (buf), total_len);
247 delete consolidated_block;
248 consolidated_block = nullptr;
250 return 0;
254 TAO_GIOP_Message_Base::format_message (TAO_OutputCDR &stream, TAO_Stub *stub, TAO_ServerRequest *request)
256 this->set_giop_flags (stream);
258 bool log_msg = TAO_debug_level > 9;
259 if (stub || request)
261 #if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP != 0
262 TAO_ZIOP_Adapter* ziop_adapter = this->orb_core_->ziop_adapter ();
263 if (ziop_adapter)
265 if (log_msg)
267 this->dump_consolidated_msg (stream);
270 const bool compressed=
271 stub ?
272 ziop_adapter->marshal_data (stream, *stub) :
273 ziop_adapter->marshal_data (stream, *this->orb_core_, request);
275 if (log_msg && !compressed)
277 TAOLIB_DEBUG ((LM_DEBUG,
278 ACE_TEXT ("TAO (%P|%t) - ")
279 ACE_TEXT ("TAO_GIOP_Message_Base::format_message, ")
280 ACE_TEXT ("GIOP message not compressed\n")));
282 // no need to log. If compressed->ZIOP library dumps message
283 // if not compressed (due to failure or policy settings)
284 // message hasn't changed and was allready dumped
285 // prior to compression...
286 log_msg = false;
289 #endif /* TAO_HAS_ZIOP */
292 // Length of all buffers.
293 size_t const total_len = stream.total_length ();
295 // NOTE: Here would also be a fine place to calculate a digital
296 // signature for the message and place it into a preallocated slot
297 // in the "ServiceContext". Similarly, this is a good spot to
298 // encrypt messages (or just the message bodies) if that's needed in
299 // this particular environment and that isn't handled by the
300 // networking infrastructure (e.g., IPSEC).
302 char *buf = const_cast <char*> (stream.buffer ());
304 CORBA::ULong bodylen = static_cast <CORBA::ULong>
305 (total_len - TAO_GIOP_MESSAGE_HEADER_LEN);
307 #if !defined (ACE_ENABLE_SWAP_ON_WRITE)
308 *(reinterpret_cast <CORBA::ULong *> (buf +
309 TAO_GIOP_MESSAGE_SIZE_OFFSET)) = bodylen;
310 #else
311 if (!stream.do_byte_swap ())
312 *(reinterpret_cast <CORBA::ULong *>
313 (buf + TAO_GIOP_MESSAGE_SIZE_OFFSET)) = bodylen;
314 else
315 ACE_CDR::swap_4 (reinterpret_cast <char *> (&bodylen),
316 buf + TAO_GIOP_MESSAGE_SIZE_OFFSET);
317 #endif /* ACE_ENABLE_SWAP_ON_WRITE */
319 if (log_msg)
321 this->dump_consolidated_msg (stream);
324 return 0;
328 TAO_GIOP_Message_Base::parse_next_message (TAO_Queued_Data &qd,
329 size_t &mesg_length)
331 if (qd.msg_block ()->length () < TAO_GIOP_MESSAGE_HEADER_LEN)
333 qd.missing_data (TAO_MISSING_DATA_UNDEFINED);
335 return 0; /* incomplete header */
337 else
339 TAO_GIOP_Message_State state;
341 if (state.parse_message_header (*(qd.msg_block ())) == -1)
343 return -1;
346 size_t const message_size = state.message_size (); /* Header + Payload */
348 if (message_size > qd.msg_block ()->length ())
350 qd.missing_data (message_size - qd.msg_block ()->length ());
352 else
354 qd.missing_data (0);
357 /* init out-parameters */
358 qd.state (state);
359 mesg_length = message_size;
361 return 1; /* complete header */
366 TAO_GIOP_Message_Base::extract_next_message (ACE_Message_Block &incoming,
367 TAO_Queued_Data *&qd)
369 if (incoming.length () < TAO_GIOP_MESSAGE_HEADER_LEN)
371 if (incoming.length () > 0)
373 // Optimize memory usage, we dont know actual message size
374 // so far, but allocate enough space to hold small GIOP
375 // messages. This way we avoid expensive "grow" operation
376 // for small messages.
377 size_t const default_buf_size = ACE_CDR::DEFAULT_BUFSIZE;
379 // Make a node which has at least message block of the size
380 // of MESSAGE_HEADER_LEN.
381 size_t const buf_size = ace_max (TAO_GIOP_MESSAGE_HEADER_LEN,
382 default_buf_size);
384 // POST: buf_size >= TAO_GIOP_MESSAGE_HEADER_LEN
386 qd = this->make_queued_data (buf_size);
388 if (qd == nullptr)
390 if (TAO_debug_level > 0)
392 TAOLIB_ERROR((LM_ERROR,
393 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::extract_next_message, ")
394 ACE_TEXT ("out of memory\n")));
396 return -1;
399 qd->msg_block ()->copy (incoming.rd_ptr (), incoming.length ());
401 incoming.rd_ptr (incoming.length ()); // consume all available data
403 qd->missing_data (TAO_MISSING_DATA_UNDEFINED);
405 else
407 // handle not initialized variables
408 qd = nullptr; // reset
411 return 0;
414 TAO_GIOP_Message_State state;
415 if (state.parse_message_header (incoming) == -1)
417 return -1;
420 size_t copying_len = state.message_size ();
422 qd = this->make_queued_data (copying_len);
424 if (qd == nullptr)
426 if (TAO_debug_level > 0)
428 TAOLIB_ERROR ((LM_ERROR,
429 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::extract_next_message, ")
430 ACE_TEXT ("out of memory\n")));
432 return -1;
435 if (copying_len > incoming.length ())
437 qd->missing_data (copying_len - incoming.length ());
438 copying_len = incoming.length ();
440 else
442 qd->missing_data (0);
445 qd->msg_block ()->copy (incoming.rd_ptr (), copying_len);
447 incoming.rd_ptr (copying_len);
448 qd->state (state);
450 return 1;
454 TAO_GIOP_Message_Base::consolidate_node (TAO_Queued_Data *qd,
455 ACE_Message_Block &incoming)
457 // Look to see whether we had atleast parsed the GIOP header ...
458 if (qd->missing_data () == TAO_MISSING_DATA_UNDEFINED)
460 // The data length that has been stuck in there during the last
461 // read ....
462 size_t const len = qd->msg_block ()->length ();
464 // paranoid check
465 if (len >= TAO_GIOP_MESSAGE_HEADER_LEN)
467 // inconsistency - this code should have parsed the header
468 // so far
469 return -1;
472 // We know that we would have space for
473 // TAO_GIOP_MESSAGE_HEADER_LEN here. So copy that much of data
474 // from the <incoming> into the message block in <qd>
475 size_t const available = incoming.length ();
476 size_t const desired = TAO_GIOP_MESSAGE_HEADER_LEN - len;
477 size_t const n_copy = ace_min (available, desired);
479 // paranoid check, but would cause endless looping
480 if (n_copy == 0)
482 return -1;
485 if (qd->msg_block ()->copy (incoming.rd_ptr (), n_copy) == -1)
487 return -1;
490 // Move the rd_ptr () in the incoming message block..
491 incoming.rd_ptr (n_copy);
493 // verify sufficient data to parse GIOP header
494 if (qd->msg_block ()->length () < TAO_GIOP_MESSAGE_HEADER_LEN)
496 return 0; /* continue */
499 TAO_GIOP_Message_State state;
501 // Parse the message header now...
502 if (state.parse_message_header (*qd->msg_block ()) == -1)
504 if (TAO_debug_level > 0)
506 TAOLIB_ERROR ((LM_ERROR,
507 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::consolidate_node, ")
508 ACE_TEXT ("error parsing header\n") ));
510 return -1;
512 // Now grow the message block so that we can copy the rest of
513 // the data, the message_block must be able to hold complete message
514 if (ACE_CDR::grow (qd->msg_block (),
515 state.message_size ()) == -1) /* GIOP_Header + Payload */
517 // on mem-error get rid of context silently, try to avoid
518 // system calls that might allocate additional memory
519 return -1;
522 // Copy the pay load..
523 // Calculate the bytes that needs to be copied in the queue...
524 size_t copy_len = state.payload_size ();
526 // If the data that needs to be copied is more than that is
527 // available to us ..
528 if (copy_len > incoming.length ())
530 // Calculate the missing data..
531 qd->missing_data (copy_len - incoming.length ());
533 // Set the actual possible copy_len that is available...
534 copy_len = incoming.length ();
536 else
538 qd->missing_data (0);
541 // ..now we are set to copy the right amount of data to the
542 // node..
543 if (qd->msg_block ()->copy (incoming.rd_ptr (), copy_len) == -1)
545 return -1;
548 // Set the <rd_ptr> of the <incoming>..
549 incoming.rd_ptr (copy_len);
551 // Get the other details...
552 qd->state (state);
554 else
556 // @todo: Need to abstract this out to a separate method...
557 size_t copy_len = qd->missing_data ();
559 if (copy_len > incoming.length ())
561 // Calculate the missing data..
562 qd->missing_data (copy_len - incoming.length ());
564 // Set the actual possible copy_len that is available...
565 copy_len = incoming.length ();
568 // paranoid check for endless-event-looping
569 if (copy_len == 0)
571 return -1;
574 // Copy the right amount of data in to the node...
575 if (qd->msg_block ()->copy (incoming.rd_ptr (), copy_len) == -1)
577 return -1;
580 // Set the <rd_ptr> of the <incoming>..
581 qd->msg_block ()->rd_ptr (copy_len);
584 return 0;
588 TAO_GIOP_Message_Base::process_request_message (TAO_Transport *transport,
589 TAO_Queued_Data *qd)
591 // Set the upcall thread
592 this->orb_core_->lf_strategy ().set_upcall_thread (this->orb_core_->leader_follower ());
594 // Get the parser we need to use
595 TAO_GIOP_Message_Generator_Parser *generator_parser =
596 this->get_parser (qd->giop_version ());
598 // A buffer that we will use to initialise the CDR stream. Since we're
599 // allocating the buffer on the stack, we may as well allocate the data
600 // block on the stack too and avoid an allocation inside the message
601 // block of the CDR.
602 #if defined (ACE_INITIALIZE_MEMORY_BEFORE_USE)
603 char repbuf[ACE_CDR::DEFAULT_BUFSIZE] = { 0 };
604 #else
605 char repbuf[ACE_CDR::DEFAULT_BUFSIZE];
606 #endif /* ACE_INITIALIZE_MEMORY_BEFORE_USE */
607 ACE_Data_Block out_db (sizeof (repbuf),
608 ACE_Message_Block::MB_DATA,
609 repbuf,
610 this->orb_core_->input_cdr_buffer_allocator (),
611 nullptr,
612 ACE_Message_Block::DONT_DELETE,
613 this->orb_core_->input_cdr_dblock_allocator ());
615 // Initialize an output CDR on the stack
616 // NOTE: Don't jump to a conclusion as to why we are using the
617 // input_cdr and hence the global pool here. These pools will move
618 // to the lanes anyway at some point of time. Further, it would have
619 // been awesome to have this in TSS. But for some reason the cloning
620 // that happens when the ORB gets flow controlled while writing a
621 // reply is messing things up. We crash horribly. Doing this adds a
622 // lock, we need to set things like this -- put stuff in TSS here
623 // and transfer to global memory when we get flow controlled. We
624 // need to work on the message block to get it right!
625 TAO_OutputCDR output (&out_db,
626 TAO_ENCAP_BYTE_ORDER,
627 this->orb_core_->input_cdr_msgblock_allocator (),
628 this->orb_core_->orb_params ()->cdr_memcpy_tradeoff (),
629 this->fragmentation_strategy_,
630 qd->giop_version ().major_version (),
631 qd->giop_version ().minor_version ());
633 // Get the read and write positions and header before we steal data.
634 size_t rd_pos = qd->msg_block ()->rd_ptr () - qd->msg_block ()->base ();
635 size_t wr_pos = qd->msg_block ()->wr_ptr () - qd->msg_block ()->base ();
636 rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
638 // Create a input CDR stream. We do the following
639 // 1 - If the incoming message block has a data block with a flag
640 // DONT_DELETE (for the data block) we create an input CDR
641 // stream the same way.
642 // 2 - If the incoming message block had a datablock from heap just
643 // use it by duplicating it and make the flag 0.
644 // NOTE: We use the same data block in which we read the message and
645 // we pass it on to the higher layers of the ORB. So we dont to any
646 // copies at all here. The same is also done in the higher layers.
648 ACE_Data_Block *db = nullptr;
650 // Get the flag in the message block
651 ACE_Message_Block::Message_Flags flg = qd->msg_block ()->self_flags ();
653 if (ACE_BIT_ENABLED (flg, ACE_Message_Block::DONT_DELETE))
655 // Use the same datablock
656 db = qd->msg_block ()->data_block ();
658 else
660 // Use a duplicated datablock as the datablock has come off the
661 // heap.
662 db = qd->msg_block ()->data_block ()->duplicate ();
664 db->size (qd->msg_block ()->length ());
666 #if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1
667 if (qd->state ().compressed ())
669 if (!this->decompress (&db, *qd, rd_pos, wr_pos))
670 return -1;
671 if (qd->msg_block ()->data_block () != db)
673 // qd still owns the original compressed buffer, db now has
674 // the uncompressed data which is on the heap
675 ACE_CLR_BITS (flg, ACE_Message_Block::DONT_DELETE);
678 #endif
679 if (TAO_debug_level > 9)
681 char buf[48];
682 ACE_OS::sprintf (buf, "Transport[" ACE_SIZE_T_FORMAT_SPECIFIER_ASCII "] recv",
683 transport->id ());
684 //due to alignment data block has an offset which needs to be corrected
685 this->dump_msg (buf,
686 reinterpret_cast <u_char *> (db->base () + rd_pos - TAO_GIOP_MESSAGE_HEADER_LEN),
687 db->size () + rd_pos - TAO_GIOP_MESSAGE_HEADER_LEN);
690 TAO_InputCDR input_cdr (db,
691 flg,
692 rd_pos,
693 wr_pos,
694 qd->byte_order (),
695 qd->giop_version ().major_version (),
696 qd->giop_version ().minor_version (),
697 this->orb_core_);
699 transport->assign_translators(&input_cdr,&output);
701 // We know we have some request message. Check whether it is a
702 // GIOP_REQUEST or GIOP_LOCATE_REQUEST to take action.
704 // Once we send the InputCDR stream we need to just forget about
705 // the stream and never touch that again for anything. We basically
706 // loose ownership of the data_block.
708 switch (qd->msg_type ())
710 case GIOP::Request:
711 // Should be taken care by the state specific invocations. They
712 // could raise an exception or write things in the output CDR
713 // stream
714 return this->process_request (transport,
715 input_cdr,
716 output,
717 generator_parser);
719 case GIOP::LocateRequest:
720 return this->process_locate_request (transport,
721 input_cdr,
722 output,
723 generator_parser);
724 default:
725 return -1;
729 #if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1
730 bool
731 TAO_GIOP_Message_Base::decompress (ACE_Data_Block **db, TAO_Queued_Data& qd,
732 size_t& rd_pos, size_t& wr_pos)
734 TAO_ZIOP_Adapter* adapter = this->orb_core_->ziop_adapter ();
735 if (adapter)
737 if (!adapter->decompress (db, qd, *this->orb_core_))
738 return false;
739 rd_pos = TAO_GIOP_MESSAGE_HEADER_LEN;
740 wr_pos = (*db)->size();
742 else
744 if (TAO_debug_level > 0)
745 TAOLIB_ERROR ((LM_ERROR,
746 ACE_TEXT ("TAO (%P|%t) ERROR: Unable to decompress ")
747 ACE_TEXT ("data (Server is not ZIOP enabled).\n")));
749 return false;
751 return true;
753 #endif
756 TAO_GIOP_Message_Base::process_reply_message (
757 TAO_Pluggable_Reply_Params &params,
758 TAO_Queued_Data *qd)
760 // Get the parser we need to use
761 TAO_GIOP_Message_Generator_Parser *generator_parser =
762 this->get_parser (qd->giop_version ());
764 // Get the read and write positions before we steal data.
765 size_t rd_pos = qd->msg_block ()->rd_ptr () - qd->msg_block ()->base ();
766 size_t wr_pos = qd->msg_block ()->wr_ptr () - qd->msg_block ()->base ();
767 rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
769 ACE_Data_Block *db = nullptr;
771 // Get the flag in the message block
772 ACE_Message_Block::Message_Flags flg = qd->msg_block ()->self_flags ();
774 if (ACE_BIT_ENABLED (flg, ACE_Message_Block::DONT_DELETE))
776 // Use the same datablock
777 db = qd->msg_block ()->data_block ();
779 else
781 // Use a duplicated datablock as the datablock has come off the
782 // heap.
783 db = qd->msg_block ()->data_block ()->duplicate ();
785 db->size (qd->msg_block ()->length ());
787 #if defined (TAO_HAS_ZIOP) && TAO_HAS_ZIOP ==1
788 if (qd->state ().compressed ())
790 if (!this->decompress (&db, *qd, rd_pos, wr_pos))
791 return -1;
792 if (qd->msg_block ()->data_block () != db)
794 // qd still owns the original compressed buffer, db now has
795 // the uncompressed data which is on the heap
796 ACE_CLR_BITS (flg, ACE_Message_Block::DONT_DELETE);
799 #endif
800 if (TAO_debug_level > 9)
802 char buf[48];
803 ACE_OS::sprintf (buf, "Transport[" ACE_SIZE_T_FORMAT_SPECIFIER_ASCII "] recv",
804 params.transport_->id ());
805 this->dump_msg (buf,
806 reinterpret_cast <u_char *> (db->base () + rd_pos - TAO_GIOP_MESSAGE_HEADER_LEN),
807 db->size () + rd_pos - TAO_GIOP_MESSAGE_HEADER_LEN);
810 // Create a empty buffer on stack
811 // NOTE: We use the same data block in which we read the message and
812 // we pass it on to the higher layers of the ORB. So we dont to any
813 // copies at all here.
814 TAO_InputCDR input_cdr (db,
815 flg,
816 rd_pos,
817 wr_pos,
818 qd->byte_order (),
819 qd->giop_version ().major_version (),
820 qd->giop_version ().minor_version (),
821 this->orb_core_);
823 // We know we have some reply message. Check whether it is a
824 // GIOP_REPLY or GIOP_LOCATE_REPLY to take action.
826 // Once we send the InputCDR stream we need to just forget about
827 // the stream and never touch that again for anything. We basically
828 // loose ownership of the data_block.
829 int retval = 0;
831 switch (qd->msg_type ())
833 case GIOP::Reply:
834 // Should be taken care by the state specific parsing
835 retval = generator_parser->parse_reply (input_cdr, params);
836 break;
837 case GIOP::LocateReply:
838 retval = generator_parser->parse_locate_reply (input_cdr, params);
839 break;
840 default:
841 retval = -1;
844 if (retval == -1)
845 return retval;
847 params.input_cdr_ = &input_cdr;
848 params.transport_->assign_translators (params.input_cdr_, nullptr);
850 retval = params.transport_->tms ()->dispatch_reply (params);
852 if (retval == -1)
854 // Something really critical happened, we will forget about
855 // every reply on this connection.
856 if (TAO_debug_level > 0)
857 TAOLIB_ERROR ((LM_ERROR,
858 ACE_TEXT ("TAO (%P|%t) - GIOP_Message_Base[%d]::process_reply_message, ")
859 ACE_TEXT ("dispatch reply failed\n"),
860 params.transport_->id ()));
862 return retval;
866 TAO_GIOP_Message_Base::generate_exception_reply (
867 TAO_OutputCDR &cdr,
868 TAO_Pluggable_Reply_Params_Base &params,
869 const CORBA::Exception &x)
871 // A new try/catch block, but if something goes wrong now we have no
872 // hope, just abort.
876 // Make the GIOP & reply header.
877 this->generate_reply_header (cdr, params);
878 x._tao_encode (cdr);
880 catch (const ::CORBA::BAD_PARAM&)
882 throw;
884 catch (const ::CORBA::Exception&)
886 // Now we know that while handling the error an other error
887 // happened -> no hope, close connection.
889 // Close the handle.
890 if (TAO_debug_level > 0)
891 TAOLIB_DEBUG ((LM_DEBUG,
892 ACE_TEXT ("(%P|%t|%N|%l) cannot marshal exception, ")
893 ACE_TEXT ("generate_exception_reply ()\n")));
894 return -1;
897 return 0;
901 TAO_GIOP_Message_Base::write_protocol_header (GIOP::MsgType type,
902 const TAO_GIOP_Message_Version &version,
903 TAO_OutputCDR &msg)
905 // Reset the message type
906 msg.reset ();
908 CORBA::Octet header[12] =
910 // The following works on non-ASCII platforms, such as MVS (which
911 // uses EBCDIC).
912 0x47, // 'G'
913 0x49, // 'I'
914 0x4f, // 'O'
915 0x50 // 'P'
918 header[4] = version.major;
919 header[5] = version.minor;
921 // "flags" octet, i.e. header[6] will be set up later when message
922 // is formatted by the transport.
924 header[7] = static_cast <CORBA::Octet> (type); // Message type
926 static ACE_CDR::ULong const header_size =
927 sizeof (header) / sizeof (header[0]);
929 // Fragmentation should not occur at this point since there are only
930 // 12 bytes in the stream, and fragmentation may only occur when
931 // the stream length >= 16.
932 msg.write_octet_array (header, header_size);
934 return msg.good_bit ();
938 TAO_GIOP_Message_Base::process_request (
939 TAO_Transport * transport,
940 TAO_InputCDR & cdr,
941 TAO_OutputCDR & output,
942 TAO_GIOP_Message_Generator_Parser * parser)
944 // This will extract the request header, set <response_required>
945 // and <sync_with_server> as appropriate.
946 TAO_ServerRequest request (this,
947 cdr,
948 output,
949 transport,
950 this->orb_core_);
952 CORBA::ULong request_id = 0;
953 CORBA::Boolean response_required = false;
957 int const parse_error = parser->parse_request_header (request);
959 // Throw an exception if the
960 if (parse_error != 0)
961 throw ::CORBA::MARSHAL (0, CORBA::COMPLETED_NO);
963 TAO_Codeset_Manager *csm = request.orb_core ()->codeset_manager ();
964 if (csm)
966 csm->process_service_context (request);
967 transport->assign_translators (&cdr, &output);
970 request_id = request.request_id ();
972 response_required = request.response_expected ();
974 CORBA::Object_var forward_to;
976 // Do this before the reply is sent.
977 this->orb_core_->request_dispatcher ()->dispatch (
978 this->orb_core_,
979 request,
980 forward_to);
982 if (request.is_forwarded ())
984 CORBA::Boolean const permanent_forward_condition =
985 this->orb_core_->is_permanent_forward_condition
986 (forward_to.in (),
987 request.request_service_context ());
989 // We should forward to another object...
990 TAO_Pluggable_Reply_Params_Base reply_params;
991 reply_params.request_id_ = request_id;
992 reply_params.reply_status (
993 permanent_forward_condition
994 ? GIOP::LOCATION_FORWARD_PERM
995 : GIOP::LOCATION_FORWARD);
996 reply_params.svc_ctx_.length (0);
998 // Send back the reply service context.
999 reply_params.service_context_notowned (
1000 &request.reply_service_info ());
1002 output.message_attributes (request_id,
1003 nullptr,
1004 TAO_Message_Semantics (TAO_Message_Semantics::TAO_REPLY),
1005 nullptr);
1007 // Make the GIOP header and Reply header
1008 this->generate_reply_header (output, reply_params);
1010 if (!(output << forward_to.in ()))
1012 if (TAO_debug_level > 0)
1013 TAOLIB_ERROR ((LM_ERROR,
1014 ACE_TEXT ("TAO (%P|%t) ERROR: Unable to marshal ")
1015 ACE_TEXT ("forward reference.\n")));
1017 return -1;
1020 output.more_fragments (false);
1022 int const result = transport->send_message (output,
1023 nullptr,
1024 &request,
1025 TAO_Message_Semantics (TAO_Message_Semantics::TAO_REPLY));
1026 if (result == -1)
1028 if (TAO_debug_level > 0)
1030 // No exception but some kind of error, yet a
1031 // response is required.
1032 TAOLIB_ERROR ((LM_ERROR,
1033 ACE_TEXT ("TAO: (%P|%t|%N|%l) %p: ")
1034 ACE_TEXT ("cannot send reply\n"),
1035 ACE_TEXT ("TAO_GIOP_Message_Base::process_request")));
1038 return result;
1041 // Only CORBA exceptions are caught here.
1042 catch ( ::CORBA::Exception& ex)
1044 int result = 0;
1046 if (response_required)
1048 result = this->send_reply_exception (transport,
1049 output,
1050 request_id,
1051 &request.reply_service_info (),
1052 &ex);
1053 if (result == -1)
1055 if (TAO_debug_level > 0)
1057 TAOLIB_ERROR ((LM_ERROR,
1058 ACE_TEXT ("TAO: (%P|%t|%N|%l) %p: ")
1059 ACE_TEXT ("cannot send exception\n"),
1060 ACE_TEXT ("process_connector_request ()")));
1062 ex._tao_print_exception (
1063 "TAO_GIOP_Message_Base::process_request[1]");
1068 else if (TAO_debug_level > 0)
1070 // It is unfortunate that an exception (probably a system
1071 // exception) was thrown by the upcall code (even by the
1072 // user) when the client was not expecting a response.
1073 // However, in this case, we cannot close the connection
1074 // down, since it really isn't the client's fault.
1076 TAOLIB_ERROR ((LM_ERROR,
1077 ACE_TEXT ("(%P|%t) exception thrown ")
1078 ACE_TEXT ("but client is not waiting a response\n")));
1080 ex._tao_print_exception (
1081 "TAO_GIOP_Message_Base::process_request[2]");
1084 return result;
1086 catch (...)
1088 // @@ TODO some c++ exception or another, but what do we do with
1089 // it?
1090 // We are supposed to map it into a CORBA::UNKNOWN exception.
1091 // BTW, this cannot be detected if using the <env> mapping. If
1092 // we have native exceptions but no support for them in the ORB
1093 // we should still be able to catch it. If we don't have native
1094 // exceptions it couldn't have been raised in the first place!
1095 int result = 0;
1097 if (response_required)
1099 CORBA::UNKNOWN exception (CORBA::SystemException::_tao_minor_code
1100 (TAO_UNHANDLED_SERVER_CXX_EXCEPTION, 0),
1101 CORBA::COMPLETED_MAYBE);
1103 if (this->send_reply_exception (transport,
1104 output,
1105 request_id,
1106 &request.reply_service_info (),
1107 &exception) == -1
1108 && TAO_debug_level > 0)
1110 TAOLIB_ERROR ((LM_ERROR,
1111 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_request[3], ")
1112 ACE_TEXT ("%p: ")
1113 ACE_TEXT ("cannot send exception\n"),
1114 ACE_TEXT ("process_request ()")));
1115 exception._tao_print_exception (
1116 "TAO_GIOP_Message_Base::process_request[3]");
1119 else if (TAO_debug_level > 0)
1121 // It is unfotunate that an exception (probably a system
1122 // exception) was thrown by the upcall code (even by the
1123 // user) when the client was not expecting a response.
1124 // However, in this case, we cannot close the connection
1125 // down, since it really isn't the client's fault.
1126 TAOLIB_ERROR ((LM_ERROR,
1127 ACE_TEXT ("TAO (%P|%t) exception thrown ")
1128 ACE_TEXT ("but client is not waiting a response\n")));
1131 return result;
1134 return 0;
1139 TAO_GIOP_Message_Base::process_locate_request (TAO_Transport *transport,
1140 TAO_InputCDR &input,
1141 TAO_OutputCDR &output,
1142 TAO_GIOP_Message_Generator_Parser *parser)
1144 // This will extract the request header, set <response_required> as
1145 // appropriate.
1146 TAO_GIOP_Locate_Request_Header locate_request (input, this->orb_core_);
1148 TAO_GIOP_Locate_Status_Msg status_info;
1150 // Defaulting.
1151 status_info.status = GIOP::UNKNOWN_OBJECT;
1153 CORBA::Boolean response_required = true;
1157 int parse_error = parser->parse_locate_header (locate_request);
1159 if (parse_error != 0)
1161 throw ::CORBA::MARSHAL (0, CORBA::COMPLETED_NO);
1164 TAO::ObjectKey tmp_key (locate_request.object_key ().length (),
1165 locate_request.object_key ().length (),
1166 locate_request.object_key ().get_buffer (),
1169 // Set it to an error state
1170 parse_error = 1;
1171 CORBA::ULong req_id = locate_request.request_id ();
1173 // We will send the reply. The ServerRequest class need not send
1174 // the reply
1175 CORBA::Boolean deferred_reply = true;
1176 TAO_ServerRequest server_request (this,
1177 req_id,
1178 response_required,
1179 deferred_reply,
1180 tmp_key,
1181 "_non_existent",
1182 output,
1183 transport,
1184 this->orb_core_,
1185 parse_error);
1187 if (parse_error != 0)
1189 throw ::CORBA::MARSHAL (0, CORBA::COMPLETED_NO);
1192 CORBA::Object_var forward_to;
1194 this->orb_core_->request_dispatcher ()->dispatch (
1195 this->orb_core_,
1196 server_request,
1197 forward_to);
1199 if (server_request.is_forwarded ())
1201 status_info.status = GIOP::OBJECT_FORWARD;
1202 status_info.forward_location_var = forward_to;
1203 if (TAO_debug_level > 0)
1204 TAOLIB_DEBUG ((LM_DEBUG,
1205 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
1206 ACE_TEXT ("called: forwarding\n")));
1208 else if (server_request.reply_status () == GIOP::NO_EXCEPTION)
1210 // We got no exception, so the object is here.
1211 status_info.status = GIOP::OBJECT_HERE;
1212 if (TAO_debug_level > 0)
1213 TAOLIB_DEBUG ((LM_DEBUG,
1214 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
1215 ACE_TEXT ("found\n")));
1217 else
1219 // Normal exception, so the object is not here
1220 status_info.status = GIOP::UNKNOWN_OBJECT;
1221 TAOLIB_DEBUG ((LM_DEBUG,
1222 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
1223 ACE_TEXT ("not here\n")));
1227 catch (const ::CORBA::Exception&)
1229 // Normal exception, so the object is not here
1230 status_info.status = GIOP::UNKNOWN_OBJECT;
1231 if (TAO_debug_level > 0)
1232 TAOLIB_DEBUG ((LM_DEBUG,
1233 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::process_locate_request, ")
1234 ACE_TEXT ("CORBA exception raised\n")));
1236 catch (...)
1238 // Normal exception, so the object is not here
1239 status_info.status = GIOP::UNKNOWN_OBJECT;
1240 if (TAO_debug_level > 0)
1241 TAOLIB_DEBUG ((LM_DEBUG,
1242 ACE_TEXT ("TAO (%P|%t) TAO_GIOP_Message_Base::process_locate_request - ")
1243 ACE_TEXT ("C++ exception raised\n")));
1246 return this->make_send_locate_reply (transport,
1247 locate_request,
1248 status_info,
1249 output,
1250 parser);
1254 TAO_GIOP_Message_Base::make_send_locate_reply (TAO_Transport *transport,
1255 TAO_GIOP_Locate_Request_Header &request,
1256 TAO_GIOP_Locate_Status_Msg &status_info,
1257 TAO_OutputCDR &output,
1258 TAO_GIOP_Message_Generator_Parser *parser)
1260 TAO_GIOP_Message_Version giop_version;
1261 output.get_version (giop_version);
1263 // Note here we are making the Locate reply header which is *QUITE*
1264 // different from the reply header made by the make_reply () call..
1265 // Make the GIOP message header
1266 this->write_protocol_header (GIOP::LocateReply, giop_version, output);
1268 // This writes the header & body
1269 parser->write_locate_reply_mesg (output,
1270 request.request_id (),
1271 status_info);
1273 output.more_fragments (false);
1275 // Send the message
1276 int const result = transport->send_message (output,
1277 nullptr,
1278 nullptr,
1279 TAO_Message_Semantics (TAO_Message_Semantics::TAO_REPLY));
1281 // Print out message if there is an error
1282 if (result == -1)
1284 if (TAO_debug_level > 0)
1286 TAOLIB_ERROR ((LM_ERROR,
1287 ACE_TEXT ("TAO: (%P|%t) %p: cannot send reply\n"),
1288 ACE_TEXT ("TAO_GIOP_Message_Base::make_send_locate_reply")));
1292 return result;
1295 // Send an "I can't understand you" message -- again, the message is
1296 // prefabricated for simplicity. This implies abortive disconnect (at
1297 // the application level, if not at the level of TCP).
1299 // NOTE that IIOP will still benefit from TCP's orderly disconnect.
1301 TAO_GIOP_Message_Base::send_error (TAO_Transport *transport)
1303 const char error_message [TAO_GIOP_MESSAGE_HEADER_LEN] =
1305 // The following works on non-ASCII platforms, such as MVS (which
1306 // uses EBCDIC).
1307 0x47, // 'G'
1308 0x49, // 'I'
1309 0x4f, // 'O'
1310 0x50, // 'P'
1311 (CORBA::Octet) 1, // Use the lowest GIOP version
1312 (CORBA::Octet) 0,
1313 TAO_ENCAP_BYTE_ORDER,
1314 GIOP::MessageError,
1315 0, 0, 0, 0
1318 if (TAO_debug_level > 9)
1320 this->dump_msg ("send_error",
1321 reinterpret_cast <const u_char *> (error_message),
1322 TAO_GIOP_MESSAGE_HEADER_LEN);
1325 ACE_Data_Block data_block (TAO_GIOP_MESSAGE_HEADER_LEN,
1326 ACE_Message_Block::MB_DATA,
1327 error_message,
1328 nullptr,
1329 nullptr,
1330 ACE_Message_Block::DONT_DELETE,
1331 nullptr);
1332 ACE_Message_Block message_block(&data_block,
1333 ACE_Message_Block::DONT_DELETE);
1334 message_block.wr_ptr (TAO_GIOP_MESSAGE_HEADER_LEN);
1336 size_t bt;
1337 int const result = transport->send_message_block_chain (&message_block, bt);
1338 if (result == -1)
1340 if (TAO_debug_level > 0)
1341 TAOLIB_DEBUG ((LM_DEBUG,
1342 ACE_TEXT ("TAO (%N|%l|%P|%t) error sending error to transport %u\n"),
1343 transport->id ()));
1346 return result;
1349 TAO_GIOP_Message_Generator_Parser*
1350 TAO_GIOP_Message_Base::get_parser (
1351 const TAO_GIOP_Message_Version &version) const
1353 switch (version.major)
1355 case 1:
1356 switch (version.minor)
1358 case 0:
1359 return
1360 const_cast<TAO_GIOP_Message_Generator_Parser_10 *> (
1361 &this->tao_giop_impl_.tao_giop_10);
1362 break;
1363 case 1:
1364 return
1365 const_cast<TAO_GIOP_Message_Generator_Parser_11 *> (
1366 &this->tao_giop_impl_.tao_giop_11);
1367 break;
1368 case 2:
1369 return
1370 const_cast<TAO_GIOP_Message_Generator_Parser_12 *> (
1371 &this->tao_giop_impl_.tao_giop_12);
1372 break;
1373 default:
1374 throw ::CORBA::INTERNAL (0, CORBA::COMPLETED_NO);
1375 break;
1377 break;
1378 default:
1379 throw ::CORBA::INTERNAL (0, CORBA::COMPLETED_NO);
1380 break;
1384 // Server sends an "I'm shutting down now, any requests you've sent me
1385 // can be retried" message to the server. The message is prefab, for
1386 // simplicity.
1388 // NOTE: this is IIOP-specific though it doesn't look like it is. It
1389 // relies on a TCP-ism: orderly disconnect, which doesn't exist in all
1390 // transport protocols. Versions of GIOP atop some transport that's
1391 // lacking orderly disconnect must define some transport-specific
1392 // handshaking (e.g. the XNS/SPP handshake convention) in order to
1393 // know that the same transport semantics are provided when shutdown
1394 // is begun with messages "in flight". (IIOP doesn't report false
1395 // errors in the case of "clean shutdown", because it relies on
1396 // orderly disconnect as provided by TCP. This quality of service is
1397 // required to write robust distributed systems.)
1398 void
1399 TAO_GIOP_Message_Base::
1400 send_close_connection (const TAO_GIOP_Message_Version &version,
1401 TAO_Transport *transport)
1403 // static CORBA::Octet
1404 // I hate this in every method. Till the time I figure out a way
1405 // around I will have them here hanging around.
1406 const char close_message [TAO_GIOP_MESSAGE_HEADER_LEN] =
1408 // The following works on non-ASCII platforms, such as MVS (which
1409 // uses EBCDIC).
1410 0x47, // 'G'
1411 0x49, // 'I'
1412 0x4f, // 'O'
1413 0x50, // 'P'
1414 static_cast<char> (version.major),
1415 static_cast<char> (version.minor),
1416 TAO_ENCAP_BYTE_ORDER,
1417 GIOP::CloseConnection,
1418 0, 0, 0, 0
1421 // It's important that we use a reliable shutdown after we send this
1422 // message, so we know it's received.
1424 // @@ should recv and discard queued data for portability; note
1425 // that this won't block (long) since we never set SO_LINGER
1426 if (TAO_debug_level > 9)
1428 this->dump_msg ("send_close_connection",
1429 reinterpret_cast <const u_char *> (close_message),
1430 TAO_GIOP_MESSAGE_HEADER_LEN);
1433 ACE_Data_Block data_block (TAO_GIOP_MESSAGE_HEADER_LEN,
1434 ACE_Message_Block::MB_DATA,
1435 close_message,
1436 nullptr,
1437 nullptr,
1438 ACE_Message_Block::DONT_DELETE,
1439 nullptr);
1440 ACE_Message_Block message_block(&data_block);
1441 message_block.wr_ptr (TAO_GIOP_MESSAGE_HEADER_LEN);
1443 size_t bt;
1444 int const result = transport->send_message_block_chain (&message_block, bt);
1445 if (result == -1)
1447 if (TAO_debug_level > 0)
1448 TAOLIB_ERROR ((LM_ERROR,
1449 ACE_TEXT ("(%P|%t) error closing connection %u, errno = %d\n"),
1450 transport->id (), ACE_ERRNO_GET));
1453 transport->close_connection ();
1454 TAOLIB_DEBUG ((LM_DEBUG,
1455 ACE_TEXT ("(%P|%t) shut down transport, handle %d\n"),
1456 transport-> id ()));
1460 TAO_GIOP_Message_Base::send_reply_exception (
1461 TAO_Transport *transport,
1462 TAO_OutputCDR &output,
1463 CORBA::ULong request_id,
1464 IOP::ServiceContextList *svc_info,
1465 CORBA::Exception *x
1468 TAO_Pluggable_Reply_Params_Base reply_params;
1469 reply_params.request_id_ = request_id;
1470 reply_params.svc_ctx_.length (0);
1472 // We are going to send some data
1473 reply_params.argument_flag_ = true;
1475 // Send back the service context we received. (RTCORBA relies on
1476 // this).
1477 reply_params.service_context_notowned (svc_info);
1479 if (CORBA::SystemException::_downcast (x) != nullptr)
1481 reply_params.reply_status (GIOP::SYSTEM_EXCEPTION);
1483 else
1485 reply_params.reply_status (GIOP::USER_EXCEPTION);
1488 if (this->generate_exception_reply (output, reply_params, *x) == -1)
1489 return -1;
1491 output.more_fragments (false);
1493 return transport->send_message (output, nullptr, nullptr, TAO_Message_Semantics (TAO_Message_Semantics::TAO_REPLY));
1496 void
1497 TAO_GIOP_Message_Base::dump_msg (const char *label,
1498 const u_char *ptr,
1499 size_t len)
1501 if (TAO_debug_level < 10)
1503 return;
1506 static const char digits[] = "0123456789ABCD";
1507 static const char *names[] =
1509 "Request",
1510 "Reply",
1511 "CancelRequest",
1512 "LocateRequest",
1513 "LocateReply",
1514 "CloseConnection",
1515 "MessageError",
1516 "Fragment"
1519 // Message name.
1520 const char *message_name = "UNKNOWN MESSAGE";
1521 u_long slot = ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET];
1522 if (slot < sizeof (names) / sizeof (names[0]))
1523 message_name = names[slot];
1525 // Byte order.
1526 int const byte_order = ptr[TAO_GIOP_MESSAGE_FLAGS_OFFSET] & 0x01;
1528 // Get the version info
1529 CORBA::Octet const major = ptr[TAO_GIOP_VERSION_MAJOR_OFFSET];
1530 CORBA::Octet const minor = ptr[TAO_GIOP_VERSION_MINOR_OFFSET];
1532 // request/reply id.
1533 CORBA::ULong tmp = 0;
1534 CORBA::ULong *id = std::addressof(tmp);
1535 char *tmp_id = nullptr;
1537 if (ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == GIOP::Request ||
1538 ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == GIOP::Reply ||
1539 ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == GIOP::Fragment)
1541 if (major == 1 && minor < 2)
1543 // @@ Only works if ServiceContextList is empty....
1544 tmp_id = (char * ) (ptr + TAO_GIOP_MESSAGE_HEADER_LEN + 4);
1546 else
1548 tmp_id = (char * ) (ptr + TAO_GIOP_MESSAGE_HEADER_LEN);
1550 #if !defined (ACE_DISABLE_SWAP_ON_READ)
1551 if (byte_order == TAO_ENCAP_BYTE_ORDER)
1553 id = reinterpret_cast <ACE_CDR::ULong*> (tmp_id);
1555 else
1557 ACE_CDR::swap_4 (tmp_id, reinterpret_cast <char*> (id));
1559 #else
1560 id = reinterpret_cast <ACE_CDR::ULong*> (tmp_id);
1561 #endif /* ACE_DISABLE_SWAP_ON_READ */
1564 else if (ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == GIOP::CancelRequest ||
1565 ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == GIOP::LocateRequest ||
1566 ptr[TAO_GIOP_MESSAGE_TYPE_OFFSET] == GIOP::LocateReply)
1568 tmp_id = (char * ) (ptr + TAO_GIOP_MESSAGE_HEADER_LEN);
1569 #if !defined (ACE_DISABLE_SWAP_ON_READ)
1570 if (byte_order == TAO_ENCAP_BYTE_ORDER)
1572 id = reinterpret_cast <ACE_CDR::ULong*> (tmp_id);
1574 else
1576 ACE_CDR::swap_4 (tmp_id, reinterpret_cast <char*> (id));
1578 #else
1579 id = reinterpret_cast <ACE_CDR::ULong*> (tmp_id);
1580 #endif /* ACE_DISABLE_SWAP_ON_READ */
1583 // Print.
1584 TAOLIB_DEBUG ((LM_DEBUG,
1585 ACE_TEXT("TAO (%P|%t) - GIOP_Message_Base::dump_msg, ")
1586 ACE_TEXT("%C GIOP message v%c.%c, %d data bytes, %s endian, ")
1587 ACE_TEXT("Type %C[%u]\n"),
1588 label,
1589 digits[ptr[TAO_GIOP_VERSION_MAJOR_OFFSET]],
1590 digits[ptr[TAO_GIOP_VERSION_MINOR_OFFSET]],
1591 len - TAO_GIOP_MESSAGE_HEADER_LEN ,
1592 (byte_order == TAO_ENCAP_BYTE_ORDER) ? ACE_TEXT("my") : ACE_TEXT("other"),
1593 message_name,
1594 *id));
1595 TAOLIB_HEX_DUMP ((LM_DEBUG,
1596 (const char *) ptr,
1597 len,
1598 ACE_TEXT ("GIOP message")));
1602 TAO_GIOP_Message_Base::generate_locate_reply_header (
1603 TAO_OutputCDR & /*cdr*/,
1604 TAO_Pluggable_Reply_Params_Base & /*params*/)
1606 return 0;
1609 bool
1610 TAO_GIOP_Message_Base::is_ready_for_bidirectional (TAO_OutputCDR &msg) const
1612 TAO_GIOP_Message_Version giop_version;
1614 msg.get_version (giop_version);
1616 // Get the parser we need to use
1617 TAO_GIOP_Message_Generator_Parser *generator_parser = this->get_parser (giop_version);
1619 // We dont really know.. So ask the generator and parser objects that
1620 // we know.
1621 // @@ TODO: Need to make this faster, instead of making virtual
1622 // call, try todo the check within this class
1623 return generator_parser->is_ready_for_bidirectional ();
1626 TAO_Queued_Data *
1627 TAO_GIOP_Message_Base::make_queued_data (size_t sz)
1629 // Make a datablock for the size requested + something. The
1630 // "something" is required because we are going to align the data
1631 // block in the message block. During alignment we could loose some
1632 // bytes. As we may not know how many bytes will be lost, we will
1633 // allocate ACE_CDR::MAX_ALIGNMENT extra.
1634 ACE_Data_Block *db =
1635 this->orb_core_->create_input_cdr_data_block (sz +
1636 ACE_CDR::MAX_ALIGNMENT);
1638 TAO_Queued_Data *qd =
1639 TAO_Queued_Data::make_queued_data (
1640 this->orb_core_->transport_message_buffer_allocator (),
1641 this->orb_core_->input_cdr_msgblock_allocator (),
1642 db);
1644 if (qd == nullptr)
1646 if (TAO_debug_level > 0)
1648 TAOLIB_ERROR ((LM_ERROR,
1649 ACE_TEXT ("TAO (%P|%t) - TAO_GIOP_Message_Base::make_queued_data, ")
1650 ACE_TEXT ("out of memory, failed to allocate queued data object\n")));
1652 db->release ();
1653 return nullptr; // NULL pointer
1656 return qd;
1659 size_t
1660 TAO_GIOP_Message_Base::header_length () const
1662 return TAO_GIOP_MESSAGE_HEADER_LEN;
1665 size_t
1666 TAO_GIOP_Message_Base::fragment_header_length (
1667 const TAO_GIOP_Message_Version& giop_version) const
1669 // Get the parser we need to use
1670 TAO_GIOP_Message_Generator_Parser *generator_parser =
1671 this->get_parser (giop_version);
1673 return generator_parser->fragment_header_length ();
1677 TAO_GIOP_Message_Base::parse_request_id (const TAO_Queued_Data *qd,
1678 CORBA::ULong &request_id) const
1680 // Get the read and write positions before we steal data.
1681 size_t rd_pos = qd->msg_block ()->rd_ptr () - qd->msg_block ()->base ();
1682 size_t wr_pos = qd->msg_block ()->wr_ptr () - qd->msg_block ()->base ();
1683 rd_pos += TAO_GIOP_MESSAGE_HEADER_LEN;
1685 // Create a input CDR stream. We do the following
1686 // 1 - If the incoming message block has a data block with a flag
1687 // DONT_DELETE (for the data block) we create an input CDR
1688 // stream the same way.
1689 // 2 - If the incoming message block had a datablock from heap just
1690 // use it by duplicating it and make the flag 0.
1691 // NOTE: We use the same data block in which we read the message and
1692 // we pass it on to the higher layers of the ORB. So we dont to any
1693 // copies at all here. The same is also done in the higher layers.
1695 ACE_Message_Block::Message_Flags flg = 0;
1696 ACE_Data_Block *db = nullptr;
1698 // Get the flag in the message block
1699 flg = qd->msg_block ()->self_flags ();
1701 if (ACE_BIT_ENABLED (flg, ACE_Message_Block::DONT_DELETE))
1703 // Use the same datablock
1704 db = qd->msg_block ()->data_block ();
1706 else
1708 // Use a duplicated datablock as the datablock has come off the
1709 // heap.
1710 db = qd->msg_block ()->data_block ()->duplicate ();
1713 TAO_InputCDR input_cdr (db,
1714 flg,
1715 rd_pos,
1716 wr_pos,
1717 qd->byte_order (),
1718 qd->giop_version ().major_version (),
1719 qd->giop_version ().minor_version (),
1720 this->orb_core_);
1722 if (qd->giop_version ().major == 1 &&
1723 (qd->giop_version ().minor == 0 || qd->giop_version ().minor == 1))
1725 switch (qd->msg_type ())
1727 case GIOP::Request:
1728 case GIOP::Reply:
1730 IOP::ServiceContextList service_context;
1732 if ((input_cdr >> service_context)
1733 && (input_cdr >> request_id))
1735 return 0;
1738 break;
1739 case GIOP::CancelRequest:
1740 case GIOP::LocateRequest:
1741 case GIOP::LocateReply:
1743 if ((input_cdr >> request_id))
1745 return 0;
1748 break;
1749 default:
1750 break;
1753 else
1755 switch (qd->msg_type ())
1757 case GIOP::Request:
1758 case GIOP::Reply:
1759 case GIOP::Fragment:
1760 case GIOP::CancelRequest:
1761 case GIOP::LocateRequest:
1762 case GIOP::LocateReply:
1764 // Dealing with GIOP-1.2, the request-id is located directly
1765 // behind the GIOP-Header. This is true for all message
1766 // types that might be sent in form of fragments or
1767 // cancel-requests.
1768 if ((input_cdr >> request_id))
1770 return 0;
1773 break;
1774 default:
1775 break;
1779 return -1;
1782 /* @return -1 error, 0 ok, +1 outstanding fragments */
1784 TAO_GIOP_Message_Base::consolidate_fragmented_message (
1785 TAO_Queued_Data * qd,
1786 TAO_Queued_Data *& msg)
1788 TAO::Incoming_Message_Stack reverse_stack;
1790 TAO_Queued_Data *tail = nullptr;
1791 TAO_Queued_Data *head = nullptr;
1794 // CONSOLIDATE FRAGMENTED MESSAGE
1797 // check for error-condition
1798 if (qd == nullptr)
1800 return -1;
1803 if (qd->giop_version ().major == 1 && qd->giop_version ().minor == 0)
1805 TAO_Queued_Data::release (qd);
1806 return -1; // error: GIOP-1.0 does not support fragments
1809 // If this is not the last fragment, push it onto stack for later processing
1810 if (qd->more_fragments ())
1812 this->fragment_stack_.push (qd);
1814 msg = nullptr; // no consolidated message available yet
1815 return 1; // status: more messages expected.
1818 tail = qd; // init
1820 // Add the current message block to the end of the chain
1821 // after adjusting the read pointer to skip the header(s)
1822 size_t const header_adjustment =
1823 this->header_length () +
1824 this->fragment_header_length (tail->giop_version ().major_version ());
1826 if (tail->msg_block ()->length () < header_adjustment)
1828 // buffer length not sufficient
1829 TAO_Queued_Data::release (qd);
1830 return -1;
1833 // duplicate code to speed up both processes, for GIOP-1.1 and GIOP-1.2
1834 if (tail->giop_version ().major_version () == 1 && tail->giop_version ().minor_version () == 1)
1836 // GIOP-1.1
1838 while (this->fragment_stack_.pop (head) != -1)
1840 if (head->more_fragments () &&
1841 head->giop_version ().major_version () == 1 &&
1842 head->giop_version ().minor_version () == 1 &&
1843 head->msg_block ()->length () >= header_adjustment)
1845 // adjust the read-pointer, skip the fragment header
1846 tail->msg_block ()->rd_ptr(header_adjustment);
1848 head->msg_block ()->cont (tail->msg_block ());
1850 tail->msg_block (nullptr);
1852 TAO_Queued_Data::release (tail);
1854 tail = head;
1856 else
1858 reverse_stack.push (head);
1862 else
1864 // > GIOP-1.2
1866 CORBA::ULong tmp_request_id = 0;
1867 if (this->parse_request_id (tail, tmp_request_id) == -1)
1869 return -1;
1872 CORBA::ULong const request_id = tmp_request_id;
1874 while (this->fragment_stack_.pop (head) != -1)
1876 CORBA::ULong head_request_id = 0;
1877 int parse_status = 0;
1879 if (head->more_fragments () &&
1880 head->giop_version ().major_version () >= 1 &&
1881 head->giop_version ().minor_version () >= 2 &&
1882 head->msg_block ()->length () >= header_adjustment &&
1883 (parse_status = this->parse_request_id (head, head_request_id)) != -1 &&
1884 request_id == head_request_id)
1886 // adjust the read-pointer, skip the fragment header
1887 tail->msg_block ()->rd_ptr(header_adjustment);
1889 head->msg_block ()->cont (tail->msg_block ());
1891 tail->msg_block (nullptr);
1893 TAO_Queued_Data::release (tail);
1895 tail = head;
1897 else
1899 if (parse_status == -1)
1901 TAO_Queued_Data::release (head);
1902 return -1;
1905 reverse_stack.push (head);
1910 // restore stack
1911 while (reverse_stack.pop (head) != -1)
1913 this->fragment_stack_.push (head);
1916 if (tail->consolidate () == -1)
1918 // memory allocation failed
1919 TAO_Queued_Data::release (tail);
1920 return -1;
1923 // set out value
1924 msg = tail;
1926 return 0;
1930 TAO_GIOP_Message_Base::discard_fragmented_message (const TAO_Queued_Data *cancel_request)
1932 // We must extract the specific request-id from message-buffer
1933 // and remove all fragments from stack that match this request-id.
1935 TAO::Incoming_Message_Stack reverse_stack;
1937 CORBA::ULong cancel_request_id;
1939 if (this->parse_request_id (cancel_request, cancel_request_id) == -1)
1941 return -1;
1944 TAO_Queued_Data *head = nullptr;
1946 // Revert stack
1947 while (this->fragment_stack_.pop (head) != -1)
1949 reverse_stack.push (head);
1952 bool discard_all_GIOP11_messages = false;
1954 // Now we are able to process message in order they have arrived.
1955 // If the cancel_request_id matches to GIOP-1.1 message, all succeeding
1956 // fragments belong to this message and must be discarded.
1957 // Note: GIOP-1.1 fragment header dont have any request-id encoded. If the
1958 // cancel_request_id matches GIOP-1.2 messages, all GIOP-1.2 fragments
1959 // having encoded the request id will be discarded.
1960 while (reverse_stack.pop (head) != -1)
1962 CORBA::ULong head_request_id;
1964 if (head->giop_version ().major_version () == 1 &&
1965 head->giop_version ().minor_version () <= 1 &&
1966 head->msg_type () != GIOP::Fragment && // GIOP11 fragment does not provide request id
1967 this->parse_request_id (head, head_request_id) >= 0 &&
1968 cancel_request_id == head_request_id)
1970 TAO_Queued_Data::release (head);
1971 discard_all_GIOP11_messages = true;
1973 else if (head->giop_version ().major_version () == 1 &&
1974 head->giop_version ().minor_version () <= 1 &&
1975 discard_all_GIOP11_messages)
1977 TAO_Queued_Data::release (head);
1979 else if (head->giop_version ().major_version () >= 1 &&
1980 head->giop_version ().minor_version () >= 2 &&
1981 this->parse_request_id (head, head_request_id) >= 0 &&
1982 cancel_request_id == head_request_id)
1984 TAO_Queued_Data::release (head);
1986 else
1988 this->fragment_stack_.push (head);
1992 return 0;
1995 TAO_GIOP_Fragmentation_Strategy *
1996 TAO_GIOP_Message_Base::fragmentation_strategy ()
1998 return this->fragmentation_strategy_;
2001 void
2002 TAO_GIOP_Message_Base::set_giop_flags (TAO_OutputCDR & msg) const
2004 CORBA::Octet * const buf =
2005 reinterpret_cast<CORBA::Octet *> (const_cast<char *> (msg.buffer ()));
2007 CORBA::Octet const & major = buf[TAO_GIOP_VERSION_MAJOR_OFFSET];
2008 CORBA::Octet const & minor = buf[TAO_GIOP_VERSION_MINOR_OFFSET];
2010 // Flags for the GIOP protocol header "flags" field.
2011 CORBA::Octet & flags = buf[TAO_GIOP_MESSAGE_FLAGS_OFFSET];
2013 // Least significant bit: Byte order
2014 ACE_SET_BITS (flags, TAO_ENCAP_BYTE_ORDER ^ msg.do_byte_swap ());
2016 // Second least significant bit: More fragments
2018 // Only supported in GIOP 1.1 or better.
2019 if (!(major <= 1 && minor == 0))
2020 ACE_SET_BITS (flags, msg.more_fragments () << 1);
2023 TAO_END_VERSIONED_NAMESPACE_DECL