1 #include "ace/UPIPE_Stream.h"
5 #if defined (ACE_HAS_THREADS)
7 #include "ace/OS_NS_string.h"
9 #if !defined (__ACE_INLINE__)
10 #include "ace/UPIPE_Stream.inl"
11 #endif /* __ACE_INLINE__ */
13 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
15 ACE_ALLOC_HOOK_DEFINE(ACE_UPIPE_Stream
)
17 ACE_UPIPE_Stream::ACE_UPIPE_Stream (void)
21 ACE_TRACE ("ACE_UPIPE_Stream::ACE_UPIPE_STREAM");
24 ACE_UPIPE_Stream::~ACE_UPIPE_Stream (void)
26 if (this->mb_last_
!= 0)
28 this->mb_last_
->release ();
34 ACE_UPIPE_Stream::control (int cmd
,
37 ACE_TRACE ("ACE_UPIPE_Stream::control");
39 return ((ACE_UPIPE_Stream
*) this)->stream_
.control
40 ((ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds
) cmd
, val
);
44 ACE_UPIPE_Stream::dump (void) const
46 #if defined (ACE_HAS_DUMP)
47 ACE_TRACE ("ACE_UPIPE_Stream::dump");
48 #endif /* ACE_HAS_DUMP */
52 ACE_UPIPE_Stream::close (void)
54 ACE_TRACE ("ACE_UPIPE_Stream::close");
55 ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex
, ace_mon
, this->lock_
, -1));
57 this->reference_count_
--;
59 if (this->reference_count_
== 0)
61 // Since the UPIPE should have been closed earlier we won't bother
62 // checking to see if closing it now fails.
64 if (this->ACE_SPIPE::get_handle () != ACE_INVALID_HANDLE
)
65 this->ACE_SPIPE::close ();
67 // Close down the ACE_stream.
68 return this->stream_
.close ();
74 ACE_UPIPE_Stream::get_remote_addr (ACE_UPIPE_Addr
&remote_sap
) const
76 ACE_TRACE ("ACE_UPIPE_Stream::get_remote_addr");
77 remote_sap
= this->remote_addr_
;
82 ACE_UPIPE_Stream::send (ACE_Message_Block
*mb_p
,
83 ACE_Time_Value
*timeout
)
85 ACE_TRACE ("ACE_UPIPE_Stream::send_msg");
86 return this->stream_
.put (mb_p
, timeout
) == -1 ? -1 : 0;
89 int ACE_UPIPE_Stream::recv (ACE_Message_Block
*& mb_p
,
90 ACE_Time_Value
*timeout
)
92 return this->stream_
.get (mb_p
, timeout
) == -1 ? -1 : 0;
98 ACE_UPIPE_Stream::send (const char *buffer
,
100 ACE_Time_Value
*timeout
)
102 ACE_TRACE ("ACE_UPIPE_Stream::send");
104 ACE_Message_Block
*mb_p
;
105 ACE_NEW_RETURN (mb_p
,
106 ACE_Message_Block (n
),
108 mb_p
->copy (buffer
, n
);
110 this->stream_
.put (mb_p
, timeout
) == -1
112 : static_cast<ssize_t
> (n
);
118 ACE_UPIPE_Stream::recv (char *buffer
,
120 ACE_Time_Value
*timeout
)
122 ACE_TRACE ("ACE_UPIPE_Stream::recv");
124 size_t bytes_read
= 0;
126 while (bytes_read
< n
)
127 if (this->mb_last_
!= 0)
129 // We have remaining data in our last read Message_Buffer.
130 size_t this_len
= this->mb_last_
->length ();
133 // The remaining data is not enough.
135 ACE_OS::memcpy ((void *) &buffer
[bytes_read
],
136 this->mb_last_
->rd_ptr (),
138 bytes_read
+= this_len
;
139 this->mb_last_
= this->mb_last_
->release (); // mb_last_ now 0
140 return static_cast<ssize_t
> (bytes_read
);
144 // The remaining data is at least enough. If there's
145 // more, we'll get it the next time through.
146 ACE_OS::memcpy (&buffer
[bytes_read
],
147 this->mb_last_
->rd_ptr (),
152 this->mb_last_
->rd_ptr (n
);
154 if (this->mb_last_
->length () == 0)
155 // Now the Message_Buffer is empty.
156 this->mb_last_
= this->mb_last_
->release ();
161 // We have to get a new Message_Buffer from our stream.
162 int result
= this->stream_
.get (this->mb_last_
, timeout
);
166 if (errno
== EWOULDBLOCK
&& bytes_read
> 0)
167 // Return the number of bytes read before we timed out.
168 return static_cast<ssize_t
> (bytes_read
);
174 return static_cast<ssize_t
> (bytes_read
);
178 ACE_UPIPE_Stream::send_n (const char *buf
,
180 ACE_Time_Value
*timeout
)
182 ACE_TRACE ("ACE_UPIPE_Stream::send_n");
184 size_t bytes_written
;
187 for (bytes_written
= 0; bytes_written
< n
; bytes_written
+= len
)
189 len
= this->send (buf
+ bytes_written
,
199 return static_cast<ssize_t
> (bytes_written
);
203 ACE_UPIPE_Stream::recv_n (char *buf
,
205 ACE_Time_Value
*timeout
)
207 ACE_TRACE ("ACE_UPIPE_Stream::recv_n");
211 for (bytes_read
= 0; bytes_read
< n
; bytes_read
+= len
)
213 len
= this->recv (buf
+ bytes_read
,
227 return static_cast< ssize_t
> (bytes_read
);
230 ACE_END_VERSIONED_NAMESPACE_DECL
232 #endif /* ACE_HAS_THREADS */