Merge pull request #2218 from jwillemsen/jwi-pthreadsigmask
[ACE_TAO.git] / TAO / tao / Asynch_Queued_Message.cpp
blobcb519752e1d4afd7e7b96e3952254310dcda9441
1 // -*- C++ -*-
2 #include "tao/Asynch_Queued_Message.h"
3 #include "tao/debug.h"
4 #include "tao/ORB_Core.h"
6 #include "ace/OS_Memory.h"
7 #include "ace/OS_NS_string.h"
8 #include "ace/os_include/sys/os_uio.h"
9 #include "ace/Log_Msg.h"
10 #include "ace/Message_Block.h"
11 #include "ace/Malloc_Base.h"
12 #include "ace/High_Res_Timer.h"
14 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
16 TAO_Asynch_Queued_Message::TAO_Asynch_Queued_Message (
17 const ACE_Message_Block *contents,
18 TAO_ORB_Core *oc,
19 ACE_Time_Value *timeout,
20 ACE_Allocator *alloc,
21 bool is_heap_allocated)
22 : TAO_Queued_Message (oc, alloc, is_heap_allocated)
23 , size_ (contents->total_length ())
24 , offset_ (0)
25 , abs_timeout_ (ACE_Time_Value::zero)
27 if (timeout != nullptr)// && *timeout != ACE_Time_Value::zero)
29 this->abs_timeout_ = ACE_High_Res_Timer::gettimeofday_hr () + *timeout;
31 // @@ Use a pool for these guys!!
32 ACE_NEW (this->buffer_, char[this->size_]);
34 size_t copy_offset = 0;
35 for (const ACE_Message_Block *i = contents;
36 i != nullptr;
37 i = i->cont ())
39 ACE_OS::memcpy (this->buffer_ + copy_offset,
40 i->rd_ptr (),
41 i->length ());
42 copy_offset += i->length ();
46 TAO_Asynch_Queued_Message::TAO_Asynch_Queued_Message (char *buf,
47 TAO_ORB_Core *oc,
48 size_t size,
49 const ACE_Time_Value &abs_timeout,
50 ACE_Allocator *alloc,
51 bool is_heap_allocated)
52 : TAO_Queued_Message (oc, alloc, is_heap_allocated)
53 , size_ (size)
54 , offset_ (0)
55 , buffer_ (buf)
56 , abs_timeout_ (abs_timeout)
60 TAO_Asynch_Queued_Message::~TAO_Asynch_Queued_Message ()
62 // @@ Use a pool for these guys!
63 delete [] this->buffer_;
66 size_t
67 TAO_Asynch_Queued_Message::message_length () const
69 return this->size_ - this->offset_;
72 int
73 TAO_Asynch_Queued_Message::all_data_sent () const
75 return this->size_ == this->offset_;
78 void
79 TAO_Asynch_Queued_Message::fill_iov (int iovcnt_max,
80 int &iovcnt,
81 iovec iov[]) const
83 ACE_ASSERT (iovcnt_max > iovcnt);
84 ACE_UNUSED_ARG (iovcnt_max); // not used if ACE_ASSERT() is empty
86 iov[iovcnt].iov_base = this->buffer_ + this->offset_;
87 iov[iovcnt].iov_len = static_cast<u_long> (this->size_ - this->offset_);
88 ++iovcnt;
91 void
92 TAO_Asynch_Queued_Message::bytes_transferred (size_t &byte_count)
94 this->state_changed_i (TAO_LF_Event::LFS_ACTIVE);
96 size_t const remaining_bytes = this->size_ - this->offset_;
97 if (byte_count > remaining_bytes)
99 this->offset_ = this->size_;
100 byte_count -= remaining_bytes;
101 return;
103 this->offset_ += byte_count;
104 byte_count = 0;
106 if (this->all_data_sent ())
107 this->state_changed (TAO_LF_Event::LFS_SUCCESS,
108 this->orb_core_->leader_follower ());
112 TAO_Queued_Message *
113 TAO_Asynch_Queued_Message::clone (ACE_Allocator *alloc)
115 char *buf = nullptr;
117 // @todo: Need to use a memory pool. But certain things need to
118 // change a bit in this class for that. Till then.
120 // Just allocate and copy data that needs to be sent, no point
121 // copying the whole buffer.
122 size_t const sz = this->size_ - this->offset_;
124 ACE_NEW_RETURN (buf,
125 char[sz],
126 nullptr);
128 ACE_OS::memcpy (buf,
129 this->buffer_ + this->offset_,
130 sz);
132 TAO_Asynch_Queued_Message *qm = nullptr;
134 if (alloc)
136 ACE_NEW_MALLOC_RETURN (qm,
137 static_cast<TAO_Asynch_Queued_Message *> (
138 alloc->malloc (sizeof (TAO_Asynch_Queued_Message))),
139 TAO_Asynch_Queued_Message (buf,
140 this->orb_core_,
142 this->abs_timeout_,
143 alloc,
144 true),
145 nullptr);
147 else
149 // No allocator, so use the common heap!
150 if (TAO_debug_level == 4)
152 // This debug is for testing purposes!
153 TAOLIB_DEBUG ((LM_DEBUG,
154 "TAO (%P|%t) - Asynch_Queued_Message::clone\n"
155 "Using global pool for allocation\n"));
158 ACE_NEW_RETURN (qm,
159 TAO_Asynch_Queued_Message (buf,
160 this->orb_core_,
162 this->abs_timeout_,
163 nullptr,
164 true),
165 nullptr);
168 return qm;
171 void
172 TAO_Asynch_Queued_Message::destroy ()
174 if (this->is_heap_created_)
176 // If we have an allocator release the memory to the allocator
177 // pool.
178 if (this->allocator_)
180 ACE_DES_FREE_THIS (this->allocator_->free,
181 TAO_Asynch_Queued_Message);
183 else // global release..
185 delete this;
190 bool
191 TAO_Asynch_Queued_Message::is_expired (const ACE_Time_Value &now) const
193 if (this->abs_timeout_ > ACE_Time_Value::zero)
195 if (this->offset_ > 0)
197 return false; //never expire partial messages
199 return this->abs_timeout_ < now;
201 return false;
204 void
205 TAO_Asynch_Queued_Message::copy_if_necessary (const ACE_Message_Block*)
207 // It's never necessary for asynchronously queued messages
210 TAO_END_VERSIONED_NAMESPACE_DECL