1 #include "ace/UPIPE_Stream.h"
3 #if defined (ACE_HAS_THREADS)
5 #include "ace/OS_NS_string.h"
7 #if !defined (__ACE_INLINE__)
8 #include "ace/UPIPE_Stream.inl"
9 #endif /* __ACE_INLINE__ */
11 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
13 ACE_ALLOC_HOOK_DEFINE(ACE_UPIPE_Stream
)
15 ACE_UPIPE_Stream::ACE_UPIPE_Stream ()
19 ACE_TRACE ("ACE_UPIPE_Stream::ACE_UPIPE_STREAM");
22 ACE_UPIPE_Stream::~ACE_UPIPE_Stream ()
24 if (this->mb_last_
!= 0)
26 this->mb_last_
->release ();
32 ACE_UPIPE_Stream::control (int cmd
,
35 ACE_TRACE ("ACE_UPIPE_Stream::control");
37 return ((ACE_UPIPE_Stream
*) this)->stream_
.control
38 ((ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds
) cmd
, val
);
42 ACE_UPIPE_Stream::dump () const
44 #if defined (ACE_HAS_DUMP)
45 ACE_TRACE ("ACE_UPIPE_Stream::dump");
46 #endif /* ACE_HAS_DUMP */
50 ACE_UPIPE_Stream::close ()
52 ACE_TRACE ("ACE_UPIPE_Stream::close");
53 ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex
, ace_mon
, this->lock_
, -1));
55 this->reference_count_
--;
57 if (this->reference_count_
== 0)
59 // Since the UPIPE should have been closed earlier we won't bother
60 // checking to see if closing it now fails.
62 if (this->ACE_SPIPE::get_handle () != ACE_INVALID_HANDLE
)
63 this->ACE_SPIPE::close ();
65 // Close down the ACE_stream.
66 return this->stream_
.close ();
72 ACE_UPIPE_Stream::get_remote_addr (ACE_UPIPE_Addr
&remote_sap
) const
74 ACE_TRACE ("ACE_UPIPE_Stream::get_remote_addr");
75 remote_sap
= this->remote_addr_
;
80 ACE_UPIPE_Stream::send (ACE_Message_Block
*mb_p
,
81 ACE_Time_Value
*timeout
)
83 ACE_TRACE ("ACE_UPIPE_Stream::send_msg");
84 return this->stream_
.put (mb_p
, timeout
) == -1 ? -1 : 0;
87 int ACE_UPIPE_Stream::recv (ACE_Message_Block
*& mb_p
,
88 ACE_Time_Value
*timeout
)
90 return this->stream_
.get (mb_p
, timeout
) == -1 ? -1 : 0;
96 ACE_UPIPE_Stream::send (const char *buffer
,
98 ACE_Time_Value
*timeout
)
100 ACE_TRACE ("ACE_UPIPE_Stream::send");
102 ACE_Message_Block
*mb_p
;
103 ACE_NEW_RETURN (mb_p
,
104 ACE_Message_Block (n
),
106 mb_p
->copy (buffer
, n
);
108 this->stream_
.put (mb_p
, timeout
) == -1
110 : static_cast<ssize_t
> (n
);
116 ACE_UPIPE_Stream::recv (char *buffer
,
118 ACE_Time_Value
*timeout
)
120 ACE_TRACE ("ACE_UPIPE_Stream::recv");
122 size_t bytes_read
= 0;
124 while (bytes_read
< n
)
125 if (this->mb_last_
!= 0)
127 // We have remaining data in our last read Message_Buffer.
128 size_t this_len
= this->mb_last_
->length ();
131 // The remaining data is not enough.
133 ACE_OS::memcpy ((void *) &buffer
[bytes_read
],
134 this->mb_last_
->rd_ptr (),
136 bytes_read
+= this_len
;
137 this->mb_last_
= this->mb_last_
->release (); // mb_last_ now 0
138 return static_cast<ssize_t
> (bytes_read
);
142 // The remaining data is at least enough. If there's
143 // more, we'll get it the next time through.
144 ACE_OS::memcpy (&buffer
[bytes_read
],
145 this->mb_last_
->rd_ptr (),
150 this->mb_last_
->rd_ptr (n
);
152 if (this->mb_last_
->length () == 0)
153 // Now the Message_Buffer is empty.
154 this->mb_last_
= this->mb_last_
->release ();
159 // We have to get a new Message_Buffer from our stream.
160 int result
= this->stream_
.get (this->mb_last_
, timeout
);
164 if (errno
== EWOULDBLOCK
&& bytes_read
> 0)
165 // Return the number of bytes read before we timed out.
166 return static_cast<ssize_t
> (bytes_read
);
172 return static_cast<ssize_t
> (bytes_read
);
176 ACE_UPIPE_Stream::send_n (const char *buf
,
178 ACE_Time_Value
*timeout
)
180 ACE_TRACE ("ACE_UPIPE_Stream::send_n");
182 size_t bytes_written
;
185 for (bytes_written
= 0; bytes_written
< n
; bytes_written
+= len
)
187 len
= this->send (buf
+ bytes_written
,
197 return static_cast<ssize_t
> (bytes_written
);
201 ACE_UPIPE_Stream::recv_n (char *buf
,
203 ACE_Time_Value
*timeout
)
205 ACE_TRACE ("ACE_UPIPE_Stream::recv_n");
209 for (bytes_read
= 0; bytes_read
< n
; bytes_read
+= len
)
211 len
= this->recv (buf
+ bytes_read
,
225 return static_cast< ssize_t
> (bytes_read
);
228 ACE_END_VERSIONED_NAMESPACE_DECL
230 #endif /* ACE_HAS_THREADS */