Cleanup ACE_HAS_PTHREAD_SIGMASK_PROTOTYPE, all platforms support it so far as I can...
[ACE_TAO.git] / ACE / ace / UPIPE_Stream.cpp
blob49c1fa24d51524ade66b59a2339adae221cbad3d
1 #include "ace/UPIPE_Stream.h"
3 #if defined (ACE_HAS_THREADS)
5 #include "ace/OS_NS_string.h"
7 #if !defined (__ACE_INLINE__)
8 #include "ace/UPIPE_Stream.inl"
9 #endif /* __ACE_INLINE__ */
11 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
13 ACE_ALLOC_HOOK_DEFINE(ACE_UPIPE_Stream)
15 ACE_UPIPE_Stream::ACE_UPIPE_Stream ()
16 : mb_last_ (0),
17 reference_count_ (0)
19 ACE_TRACE ("ACE_UPIPE_Stream::ACE_UPIPE_STREAM");
22 ACE_UPIPE_Stream::~ACE_UPIPE_Stream ()
24 if (this->mb_last_ != 0)
26 this->mb_last_->release ();
27 this->mb_last_ = 0;
31 int
32 ACE_UPIPE_Stream::control (int cmd,
33 void * val) const
35 ACE_TRACE ("ACE_UPIPE_Stream::control");
37 return ((ACE_UPIPE_Stream *) this)->stream_.control
38 ((ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds) cmd, val);
41 void
42 ACE_UPIPE_Stream::dump () const
44 #if defined (ACE_HAS_DUMP)
45 ACE_TRACE ("ACE_UPIPE_Stream::dump");
46 #endif /* ACE_HAS_DUMP */
49 int
50 ACE_UPIPE_Stream::close ()
52 ACE_TRACE ("ACE_UPIPE_Stream::close");
53 ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1));
55 this->reference_count_--;
57 if (this->reference_count_ == 0)
59 // Since the UPIPE should have been closed earlier we won't bother
60 // checking to see if closing it now fails.
62 if (this->ACE_SPIPE::get_handle () != ACE_INVALID_HANDLE)
63 this->ACE_SPIPE::close ();
65 // Close down the ACE_stream.
66 return this->stream_.close ();
68 return 0;
71 int
72 ACE_UPIPE_Stream::get_remote_addr (ACE_UPIPE_Addr &remote_sap) const
74 ACE_TRACE ("ACE_UPIPE_Stream::get_remote_addr");
75 remote_sap = this->remote_addr_;
76 return 0;
79 int
80 ACE_UPIPE_Stream::send (ACE_Message_Block *mb_p,
81 ACE_Time_Value *timeout)
83 ACE_TRACE ("ACE_UPIPE_Stream::send_msg");
84 return this->stream_.put (mb_p, timeout) == -1 ? -1 : 0;
87 int ACE_UPIPE_Stream::recv (ACE_Message_Block *& mb_p,
88 ACE_Time_Value *timeout)
90 return this->stream_.get (mb_p, timeout) == -1 ? -1 : 0;
93 // Send a buffer.
95 ssize_t
96 ACE_UPIPE_Stream::send (const char *buffer,
97 size_t n,
98 ACE_Time_Value *timeout)
100 ACE_TRACE ("ACE_UPIPE_Stream::send");
102 ACE_Message_Block *mb_p;
103 ACE_NEW_RETURN (mb_p,
104 ACE_Message_Block (n),
105 -1);
106 mb_p->copy (buffer, n);
107 return
108 this->stream_.put (mb_p, timeout) == -1
109 ? -1
110 : static_cast<ssize_t> (n);
113 // Receive a buffer.
115 ssize_t
116 ACE_UPIPE_Stream::recv (char *buffer,
117 size_t n,
118 ACE_Time_Value *timeout)
120 ACE_TRACE ("ACE_UPIPE_Stream::recv");
121 // Index in buffer.
122 size_t bytes_read = 0;
124 while (bytes_read < n)
125 if (this->mb_last_ != 0)
127 // We have remaining data in our last read Message_Buffer.
128 size_t this_len = this->mb_last_->length ();
129 if (this_len < n)
131 // The remaining data is not enough.
133 ACE_OS::memcpy ((void *) &buffer[bytes_read],
134 this->mb_last_->rd_ptr (),
135 this_len);
136 bytes_read += this_len;
137 this->mb_last_ = this->mb_last_->release (); // mb_last_ now 0
138 return static_cast<ssize_t> (bytes_read);
140 else
142 // The remaining data is at least enough. If there's
143 // more, we'll get it the next time through.
144 ACE_OS::memcpy (&buffer[bytes_read],
145 this->mb_last_->rd_ptr (),
147 bytes_read += n;
149 // Advance rd_ptr.
150 this->mb_last_->rd_ptr (n);
152 if (this->mb_last_->length () == 0)
153 // Now the Message_Buffer is empty.
154 this->mb_last_ = this->mb_last_->release ();
157 else
159 // We have to get a new Message_Buffer from our stream.
160 int result = this->stream_.get (this->mb_last_, timeout);
162 if (result == -1)
164 if (errno == EWOULDBLOCK && bytes_read > 0)
165 // Return the number of bytes read before we timed out.
166 return static_cast<ssize_t> (bytes_read);
167 else
168 return -1;
172 return static_cast<ssize_t> (bytes_read);
175 ssize_t
176 ACE_UPIPE_Stream::send_n (const char *buf,
177 size_t n,
178 ACE_Time_Value *timeout)
180 ACE_TRACE ("ACE_UPIPE_Stream::send_n");
182 size_t bytes_written;
183 ssize_t len = 0;
185 for (bytes_written = 0; bytes_written < n; bytes_written += len)
187 len = this->send (buf + bytes_written,
188 n - bytes_written,
189 timeout);
191 if (len == -1)
193 return -1;
197 return static_cast<ssize_t> (bytes_written);
200 ssize_t
201 ACE_UPIPE_Stream::recv_n (char *buf,
202 size_t n,
203 ACE_Time_Value *timeout)
205 ACE_TRACE ("ACE_UPIPE_Stream::recv_n");
206 size_t bytes_read;
207 ssize_t len = 0;
209 for (bytes_read = 0; bytes_read < n; bytes_read += len)
211 len = this->recv (buf + bytes_read,
212 n - bytes_read,
213 timeout);
215 if (len == -1)
217 return -1;
219 else if (len == 0)
221 break;
225 return static_cast< ssize_t> (bytes_read);
228 ACE_END_VERSIONED_NAMESPACE_DECL
230 #endif /* ACE_HAS_THREADS */