Merge pull request #2218 from jwillemsen/jwi-pthreadsigmask
[ACE_TAO.git] / TAO / tao / Synch_Queued_Message.cpp
blob4357b01f644990d3134e9dc006465376c0219726
1 // -*- C++ -*-
2 #include "tao/Synch_Queued_Message.h"
3 #include "tao/debug.h"
4 #include "tao/ORB_Core.h"
6 #include "ace/Malloc_T.h"
7 #include "ace/Message_Block.h"
9 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
11 TAO_Synch_Queued_Message::TAO_Synch_Queued_Message (
12 const ACE_Message_Block *contents,
13 TAO_ORB_Core *oc,
14 ACE_Allocator *alloc,
15 bool is_heap_allocated)
16 : TAO_Queued_Message (oc, alloc, is_heap_allocated)
17 , contents_ (const_cast<ACE_Message_Block*> (contents))
18 , current_block_ (contents_)
19 , own_contents_ (is_heap_allocated)
23 TAO_Synch_Queued_Message::~TAO_Synch_Queued_Message ()
25 if (this->own_contents_ && this->contents_ != nullptr)
27 ACE_Message_Block::release (this->contents_);
31 const ACE_Message_Block *
32 TAO_Synch_Queued_Message::current_block () const
34 return this->current_block_;
37 size_t
38 TAO_Synch_Queued_Message::message_length () const
40 if (this->current_block_ == nullptr)
42 return 0;
45 return this->current_block_->total_length ();
48 int
49 TAO_Synch_Queued_Message::all_data_sent () const
51 return this->current_block_ == nullptr;
54 void
55 TAO_Synch_Queued_Message::fill_iov (int iovcnt_max,
56 int &iovcnt,
57 iovec iov[]) const
59 ACE_ASSERT (iovcnt_max > iovcnt);
61 for (const ACE_Message_Block *message_block = this->current_block_;
62 message_block != nullptr && iovcnt < iovcnt_max;
63 message_block = message_block->cont ())
65 size_t const message_block_length = message_block->length ();
67 // Check if this block has any data to be sent.
68 if (message_block_length > 0)
70 // Collect the data in the iovec.
71 iov[iovcnt].iov_base = message_block->rd_ptr ();
72 iov[iovcnt].iov_len = static_cast<u_long> (message_block_length);
74 // Increment iovec counter.
75 ++iovcnt;
80 void
81 TAO_Synch_Queued_Message::bytes_transferred (size_t &byte_count)
83 this->state_changed_i (TAO_LF_Event::LFS_ACTIVE);
85 while (this->current_block_ != nullptr && byte_count > 0)
87 size_t const l = this->current_block_->length ();
89 if (byte_count < l)
91 this->current_block_->rd_ptr (byte_count);
92 byte_count = 0;
93 return;
96 byte_count -= l;
97 this->current_block_->rd_ptr (l);
98 this->current_block_ = this->current_block_->cont ();
100 while (this->current_block_ != nullptr
101 && this->current_block_->length () == 0)
103 this->current_block_ = this->current_block_->cont ();
107 if (this->current_block_ == nullptr)
108 this->state_changed (TAO_LF_Event::LFS_SUCCESS,
109 this->orb_core_->leader_follower ());
112 TAO_Queued_Message *
113 TAO_Synch_Queued_Message::clone (ACE_Allocator *alloc)
115 TAO_Synch_Queued_Message *qm = nullptr;
117 // Clone the message block.
118 // NOTE: We wantedly do the cloning from <current_block_> instead of
119 // starting from <contents_> since we dont want to clone blocks that
120 // have already been sent on the wire. Waste of memory and
121 // associated copying.
122 ACE_Message_Block *mb = this->current_block_->clone ();
124 if (alloc)
126 ACE_NEW_MALLOC_RETURN (qm,
127 static_cast<TAO_Synch_Queued_Message *> (
128 alloc->malloc (sizeof (TAO_Synch_Queued_Message))),
129 TAO_Synch_Queued_Message (mb,
130 this->orb_core_,
131 alloc,
132 true),
133 nullptr);
135 else
137 ACE_NEW_RETURN (qm,
138 TAO_Synch_Queued_Message (mb, this->orb_core_, nullptr, true),
139 nullptr);
142 return qm;
145 void
146 TAO_Synch_Queued_Message::destroy ()
148 if (this->own_contents_)
150 ACE_Message_Block::release (this->contents_);
151 this->current_block_ = nullptr;
152 this->contents_ = nullptr;
155 if (this->is_heap_created_)
157 // If we have an allocator release the memory to the allocator
158 // pool.
159 if (this->allocator_)
161 ACE_DES_FREE_THIS (this->allocator_->free,
162 TAO_Synch_Queued_Message);
164 else // global release..
166 delete this;
171 void
172 TAO_Synch_Queued_Message::copy_if_necessary (const ACE_Message_Block* chain)
174 if (!this->own_contents_)
176 // Go through the message block chain looking for the message block
177 // that matches our "current" message block.
178 for (const ACE_Message_Block* mb = chain; mb != nullptr; mb = mb->cont ())
180 if (mb == this->current_block_)
182 // Once we have found the message block, we need to
183 // clone the current block so that if another thread comes
184 // in and calls reset() on the output stream (via another
185 // invocation on the transport), it doesn't cause the rest
186 // of our message to be released.
187 this->own_contents_ = true;
188 this->contents_ = this->current_block_->clone ();
189 this->current_block_ = this->contents_;
190 break;
196 TAO_END_VERSIONED_NAMESPACE_DECL