2 #include "tao/Synch_Queued_Message.h"
4 #include "tao/ORB_Core.h"
6 #include "ace/Malloc_T.h"
7 #include "ace/Message_Block.h"
9 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
11 TAO_Synch_Queued_Message::TAO_Synch_Queued_Message (
12 const ACE_Message_Block
*contents
,
15 bool is_heap_allocated
)
16 : TAO_Queued_Message (oc
, alloc
, is_heap_allocated
)
17 , contents_ (const_cast<ACE_Message_Block
*> (contents
))
18 , current_block_ (contents_
)
19 , own_contents_ (is_heap_allocated
)
23 TAO_Synch_Queued_Message::~TAO_Synch_Queued_Message ()
25 if (this->own_contents_
&& this->contents_
!= nullptr)
27 ACE_Message_Block::release (this->contents_
);
31 const ACE_Message_Block
*
32 TAO_Synch_Queued_Message::current_block () const
34 return this->current_block_
;
38 TAO_Synch_Queued_Message::message_length () const
40 if (this->current_block_
== nullptr)
45 return this->current_block_
->total_length ();
49 TAO_Synch_Queued_Message::all_data_sent () const
51 return this->current_block_
== nullptr;
55 TAO_Synch_Queued_Message::fill_iov (int iovcnt_max
,
59 ACE_ASSERT (iovcnt_max
> iovcnt
);
61 for (const ACE_Message_Block
*message_block
= this->current_block_
;
62 message_block
!= nullptr && iovcnt
< iovcnt_max
;
63 message_block
= message_block
->cont ())
65 size_t const message_block_length
= message_block
->length ();
67 // Check if this block has any data to be sent.
68 if (message_block_length
> 0)
70 // Collect the data in the iovec.
71 iov
[iovcnt
].iov_base
= message_block
->rd_ptr ();
72 iov
[iovcnt
].iov_len
= static_cast<u_long
> (message_block_length
);
74 // Increment iovec counter.
81 TAO_Synch_Queued_Message::bytes_transferred (size_t &byte_count
)
83 this->state_changed_i (TAO_LF_Event::LFS_ACTIVE
);
85 while (this->current_block_
!= nullptr && byte_count
> 0)
87 size_t const l
= this->current_block_
->length ();
91 this->current_block_
->rd_ptr (byte_count
);
97 this->current_block_
->rd_ptr (l
);
98 this->current_block_
= this->current_block_
->cont ();
100 while (this->current_block_
!= nullptr
101 && this->current_block_
->length () == 0)
103 this->current_block_
= this->current_block_
->cont ();
107 if (this->current_block_
== nullptr)
108 this->state_changed (TAO_LF_Event::LFS_SUCCESS
,
109 this->orb_core_
->leader_follower ());
113 TAO_Synch_Queued_Message::clone (ACE_Allocator
*alloc
)
115 TAO_Synch_Queued_Message
*qm
= nullptr;
117 // Clone the message block.
118 // NOTE: We wantedly do the cloning from <current_block_> instead of
119 // starting from <contents_> since we dont want to clone blocks that
120 // have already been sent on the wire. Waste of memory and
121 // associated copying.
122 ACE_Message_Block
*mb
= this->current_block_
->clone ();
126 ACE_NEW_MALLOC_RETURN (qm
,
127 static_cast<TAO_Synch_Queued_Message
*> (
128 alloc
->malloc (sizeof (TAO_Synch_Queued_Message
))),
129 TAO_Synch_Queued_Message (mb
,
138 TAO_Synch_Queued_Message (mb
, this->orb_core_
, nullptr, true),
146 TAO_Synch_Queued_Message::destroy ()
148 if (this->own_contents_
)
150 ACE_Message_Block::release (this->contents_
);
151 this->current_block_
= nullptr;
152 this->contents_
= nullptr;
155 if (this->is_heap_created_
)
157 // If we have an allocator release the memory to the allocator
159 if (this->allocator_
)
161 ACE_DES_FREE_THIS (this->allocator_
->free
,
162 TAO_Synch_Queued_Message
);
164 else // global release..
172 TAO_Synch_Queued_Message::copy_if_necessary (const ACE_Message_Block
* chain
)
174 if (!this->own_contents_
)
176 // Go through the message block chain looking for the message block
177 // that matches our "current" message block.
178 for (const ACE_Message_Block
* mb
= chain
; mb
!= nullptr; mb
= mb
->cont ())
180 if (mb
== this->current_block_
)
182 // Once we have found the message block, we need to
183 // clone the current block so that if another thread comes
184 // in and calls reset() on the output stream (via another
185 // invocation on the transport), it doesn't cause the rest
186 // of our message to be released.
187 this->own_contents_
= true;
188 this->contents_
= this->current_block_
->clone ();
189 this->current_block_
= this->contents_
;
196 TAO_END_VERSIONED_NAMESPACE_DECL