Also use Objects as part of an operation but as a result don't generate Any operation...
[ACE_TAO.git] / ACE / ace / UPIPE_Stream.cpp
blob46d470ce792d7d111ba1c0297dad7f730d2b12c0
1 #include "ace/UPIPE_Stream.h"
5 #if defined (ACE_HAS_THREADS)
7 #include "ace/OS_NS_string.h"
9 #if !defined (__ACE_INLINE__)
10 #include "ace/UPIPE_Stream.inl"
11 #endif /* __ACE_INLINE__ */
13 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
15 ACE_ALLOC_HOOK_DEFINE(ACE_UPIPE_Stream)
17 ACE_UPIPE_Stream::ACE_UPIPE_Stream (void)
18 : mb_last_ (0),
19 reference_count_ (0)
21 ACE_TRACE ("ACE_UPIPE_Stream::ACE_UPIPE_STREAM");
24 ACE_UPIPE_Stream::~ACE_UPIPE_Stream (void)
26 if (this->mb_last_ != 0)
28 this->mb_last_->release ();
29 this->mb_last_ = 0;
33 int
34 ACE_UPIPE_Stream::control (int cmd,
35 void * val) const
37 ACE_TRACE ("ACE_UPIPE_Stream::control");
39 return ((ACE_UPIPE_Stream *) this)->stream_.control
40 ((ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds) cmd, val);
43 void
44 ACE_UPIPE_Stream::dump (void) const
46 #if defined (ACE_HAS_DUMP)
47 ACE_TRACE ("ACE_UPIPE_Stream::dump");
48 #endif /* ACE_HAS_DUMP */
51 int
52 ACE_UPIPE_Stream::close (void)
54 ACE_TRACE ("ACE_UPIPE_Stream::close");
55 ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->lock_, -1));
57 this->reference_count_--;
59 if (this->reference_count_ == 0)
61 // Since the UPIPE should have been closed earlier we won't bother
62 // checking to see if closing it now fails.
64 if (this->ACE_SPIPE::get_handle () != ACE_INVALID_HANDLE)
65 this->ACE_SPIPE::close ();
67 // Close down the ACE_stream.
68 return this->stream_.close ();
70 return 0;
73 int
74 ACE_UPIPE_Stream::get_remote_addr (ACE_UPIPE_Addr &remote_sap) const
76 ACE_TRACE ("ACE_UPIPE_Stream::get_remote_addr");
77 remote_sap = this->remote_addr_;
78 return 0;
81 int
82 ACE_UPIPE_Stream::send (ACE_Message_Block *mb_p,
83 ACE_Time_Value *timeout)
85 ACE_TRACE ("ACE_UPIPE_Stream::send_msg");
86 return this->stream_.put (mb_p, timeout) == -1 ? -1 : 0;
89 int ACE_UPIPE_Stream::recv (ACE_Message_Block *& mb_p,
90 ACE_Time_Value *timeout)
92 return this->stream_.get (mb_p, timeout) == -1 ? -1 : 0;
95 // Send a buffer.
97 ssize_t
98 ACE_UPIPE_Stream::send (const char *buffer,
99 size_t n,
100 ACE_Time_Value *timeout)
102 ACE_TRACE ("ACE_UPIPE_Stream::send");
104 ACE_Message_Block *mb_p;
105 ACE_NEW_RETURN (mb_p,
106 ACE_Message_Block (n),
107 -1);
108 mb_p->copy (buffer, n);
109 return
110 this->stream_.put (mb_p, timeout) == -1
111 ? -1
112 : static_cast<ssize_t> (n);
115 // Receive a buffer.
117 ssize_t
118 ACE_UPIPE_Stream::recv (char *buffer,
119 size_t n,
120 ACE_Time_Value *timeout)
122 ACE_TRACE ("ACE_UPIPE_Stream::recv");
123 // Index in buffer.
124 size_t bytes_read = 0;
126 while (bytes_read < n)
127 if (this->mb_last_ != 0)
129 // We have remaining data in our last read Message_Buffer.
130 size_t this_len = this->mb_last_->length ();
131 if (this_len < n)
133 // The remaining data is not enough.
135 ACE_OS::memcpy ((void *) &buffer[bytes_read],
136 this->mb_last_->rd_ptr (),
137 this_len);
138 bytes_read += this_len;
139 this->mb_last_ = this->mb_last_->release (); // mb_last_ now 0
140 return static_cast<ssize_t> (bytes_read);
142 else
144 // The remaining data is at least enough. If there's
145 // more, we'll get it the next time through.
146 ACE_OS::memcpy (&buffer[bytes_read],
147 this->mb_last_->rd_ptr (),
149 bytes_read += n;
151 // Advance rd_ptr.
152 this->mb_last_->rd_ptr (n);
154 if (this->mb_last_->length () == 0)
155 // Now the Message_Buffer is empty.
156 this->mb_last_ = this->mb_last_->release ();
159 else
161 // We have to get a new Message_Buffer from our stream.
162 int result = this->stream_.get (this->mb_last_, timeout);
164 if (result == -1)
166 if (errno == EWOULDBLOCK && bytes_read > 0)
167 // Return the number of bytes read before we timed out.
168 return static_cast<ssize_t> (bytes_read);
169 else
170 return -1;
174 return static_cast<ssize_t> (bytes_read);
177 ssize_t
178 ACE_UPIPE_Stream::send_n (const char *buf,
179 size_t n,
180 ACE_Time_Value *timeout)
182 ACE_TRACE ("ACE_UPIPE_Stream::send_n");
184 size_t bytes_written;
185 ssize_t len = 0;
187 for (bytes_written = 0; bytes_written < n; bytes_written += len)
189 len = this->send (buf + bytes_written,
190 n - bytes_written,
191 timeout);
193 if (len == -1)
195 return -1;
199 return static_cast<ssize_t> (bytes_written);
202 ssize_t
203 ACE_UPIPE_Stream::recv_n (char *buf,
204 size_t n,
205 ACE_Time_Value *timeout)
207 ACE_TRACE ("ACE_UPIPE_Stream::recv_n");
208 size_t bytes_read;
209 ssize_t len = 0;
211 for (bytes_read = 0; bytes_read < n; bytes_read += len)
213 len = this->recv (buf + bytes_read,
214 n - bytes_read,
215 timeout);
217 if (len == -1)
219 return -1;
221 else if (len == 0)
223 break;
227 return static_cast< ssize_t> (bytes_read);
230 ACE_END_VERSIONED_NAMESPACE_DECL
232 #endif /* ACE_HAS_THREADS */