Cleanup ACE_HAS_PTHREAD_SIGMASK_PROTOTYPE, all platforms support it so far as I can...
[ACE_TAO.git] / ACE / protocols / ace / RMCast / Socket.cpp
blobe9587d4b477e62d5a26cedeca492811c17eb0bc9
1 // author : Boris Kolpackov <boris@kolpackov.net>
2 #include "ace/OS_Memory.h"
3 #include "ace/OS_NS_stdio.h"
4 #include "ace/OS_NS_stdlib.h"
5 #include "ace/OS_NS_string.h"
6 #include "ace/OS_NS_unistd.h"
7 #include "ace/OS_NS_sys_time.h" // gettimeofday
9 #include "ace/Unbounded_Queue.h"
11 #include "ace/Pipe.h"
13 #include "Stack.h"
14 #include "Protocol.h"
15 #include "Bits.h"
17 #include "Fragment.h"
18 #include "Reassemble.h"
19 #include "Acknowledge.h"
20 #include "Retransmit.h"
21 #include "Flow.h"
22 #include "Link.h"
24 #include "Socket.h"
27 #include <iostream>
28 using std::cerr;
29 using std::endl;
32 namespace ACE_RMCast
34 class Socket_Impl : protected Element
36 public:
37 ~Socket_Impl ();
39 Socket_Impl (Address const& a, bool loop, Parameters const& params);
41 public:
42 void
43 send_ (void const* buf, size_t s);
45 ssize_t
46 recv_ (void* buf,
47 size_t s,
48 ACE_Time_Value const* timeout,
49 ACE_INET_Addr* from);
51 ssize_t
52 size_ (ACE_Time_Value const* timeout);
54 ACE_HANDLE
55 get_handle_ ();
57 private:
58 //FUZZ: disable check_for_lack_ACE_OS
59 virtual void recv (Message_ptr m);
60 //FUZZ: enable check_for_lack_ACE_OS
62 private:
63 bool loop_;
64 Parameters const params_;
66 Mutex mutex_;
67 Condition cond_;
69 ACE_Unbounded_Queue<Message_ptr> queue_;
71 ACE_Pipe signal_pipe_;
73 std::unique_ptr<Fragment> fragment_;
74 std::unique_ptr<Reassemble> reassemble_;
75 std::unique_ptr<Acknowledge> acknowledge_;
76 std::unique_ptr<Retransmit> retransmit_;
77 std::unique_ptr<Flow> flow_;
78 std::unique_ptr<Link> link_;
82 Socket_Impl::
83 Socket_Impl (Address const& a, bool loop, Parameters const& params)
84 : loop_ (loop),
85 params_ (params),
86 cond_ (mutex_)
88 fragment_.reset (new Fragment (params_));
89 reassemble_.reset (new Reassemble (params_));
90 acknowledge_.reset (new Acknowledge (params_));
91 retransmit_.reset (new Retransmit (params_));
92 flow_.reset (new Flow (params_));
93 link_.reset (new Link (a, params_));
95 // Start IN stack from top to bottom.
97 in_start (0);
98 fragment_->in_start (this);
99 reassemble_->in_start (fragment_.get ());
100 acknowledge_->in_start (reassemble_.get ());
101 retransmit_->in_start (acknowledge_.get ());
102 flow_->in_start (retransmit_.get ());
103 link_->in_start (flow_.get ());
105 // Start OUT stack from bottom up.
107 link_->out_start (0);
108 flow_->out_start (link_.get ());
109 retransmit_->out_start (flow_.get ());
110 acknowledge_->out_start (retransmit_.get ());
111 reassemble_->out_start (acknowledge_.get ());
112 fragment_->out_start (reassemble_.get ());
113 out_start (fragment_.get ());
116 Socket_Impl::
117 ~Socket_Impl ()
119 // Stop OUT stack from top to bottom.
121 out_stop ();
122 fragment_->out_stop ();
123 reassemble_->out_stop ();
124 acknowledge_->out_stop ();
125 retransmit_->out_stop ();
126 flow_->out_stop ();
127 link_->out_stop ();
129 // Stop IN stack from bottom up.
131 link_->in_stop ();
132 flow_->in_stop ();
133 retransmit_->in_stop ();
134 acknowledge_->in_stop ();
135 reassemble_->in_stop ();
136 fragment_->in_stop ();
137 in_stop ();
139 // Close signal pipe.
141 if (signal_pipe_.read_handle () != ACE_INVALID_HANDLE)
142 signal_pipe_.close ();
146 void Socket_Impl::
147 send_ (void const* buf, size_t s)
149 Message_ptr m (new Message);
151 m->add (Profile_ptr (new Data (buf, s)));
153 // Qualification is for VC6 and VxWorks.
155 Element::send (m);
158 ssize_t Socket_Impl::
159 recv_ (void* buf,
160 size_t s,
161 ACE_Time_Value const* timeout,
162 ACE_INET_Addr* from)
164 ACE_Time_Value abs_time;
166 if (timeout)
167 abs_time = ACE_OS::gettimeofday () + *timeout;
169 Lock l (mutex_);
171 while (queue_.is_empty ())
173 if (timeout)
175 if (cond_.wait (&abs_time) != -1)
176 break;
178 else
180 if (cond_.wait () != -1)
181 break;
184 return -1; // errno is already set
188 Message_ptr m;
190 if (queue_.dequeue_head (m) == -1)
191 ACE_OS::abort ();
194 if (queue_.is_empty ())
196 // Remove data from the pipe.
198 if (signal_pipe_.read_handle () != ACE_INVALID_HANDLE)
200 char c;
202 if (signal_pipe_.recv (&c, 1) != 1)
204 ACE_OS::perror ("read: ");
205 ACE_OS::abort ();
210 if (from)
211 *from = static_cast<From const*> (m->find (From::id))->address ();
213 if (m->find (NoData::id) != 0)
215 errno = ENOENT;
216 return -1;
219 Data const* d = static_cast<Data const*>(m->find (Data::id));
221 ssize_t r (static_cast<ssize_t> (d->size () < s ? d->size () : s));
223 ACE_OS::memcpy (buf, d->buf (), r);
225 return r;
228 ssize_t Socket_Impl::
229 size_ (ACE_Time_Value const* timeout)
231 ACE_Time_Value abs_time;
233 if (timeout)
234 abs_time = ACE_OS::gettimeofday () + *timeout;
236 Lock l (mutex_);
238 while (queue_.is_empty ())
240 if (timeout)
242 if (cond_.wait (&abs_time) != -1)
243 break;
245 else
247 if (cond_.wait () != -1)
248 break;
251 return -1; // errno is already set
254 // I can't get the head of the queue without actually dequeuing
255 // the element.
257 Message_ptr m;
259 if (queue_.dequeue_head (m) == -1)
260 ACE_OS::abort ();
262 if (queue_.enqueue_head (m) == -1)
263 ACE_OS::abort ();
265 if (m->find (NoData::id) != 0)
267 errno = ENOENT;
268 return -1;
271 Data const* d = static_cast<Data const*>(m->find (Data::id));
273 return static_cast<ssize_t> (d->size ());
276 ACE_HANDLE Socket_Impl::
277 get_handle_ ()
279 if (signal_pipe_.read_handle () == ACE_INVALID_HANDLE)
281 signal_pipe_.open ();
284 return signal_pipe_.read_handle ();
288 void Socket_Impl::recv (Message_ptr m)
290 if (m->find (Data::id) != 0 || m->find (NoData::id) != 0)
292 if (!loop_)
294 Address to (static_cast<To const*> (m->find (To::id))->address ());
296 Address from (
297 static_cast<From const*> (m->find (From::id))->address ());
299 if (to == from)
300 return;
303 Lock l (mutex_);
305 //if (queue_.size () != 0)
306 // cerr << "recv socket queue size: " << queue_.size () << endl;
308 //FUZZ: disable check_for_lack_ACE_OS
309 bool signal (queue_.is_empty ());
310 //FUZZ: enable check_for_lack_ACE_OS
312 queue_.enqueue_tail (m);
314 if (signal)
316 // Also write to the pipe.
317 if (signal_pipe_.write_handle () != ACE_INVALID_HANDLE)
319 char c;
321 if (signal_pipe_.send (&c, 1) != 1)
323 // perror ("write: ");
324 ACE_OS::abort ();
328 cond_.signal ();
334 // Socket
337 Socket::
338 ~Socket ()
342 Socket::
343 Socket (Address const& a, bool loop, Parameters const& params)
344 : impl_ (new Socket_Impl (a, loop, params))
348 void Socket::send (void const* buf, size_t s)
350 impl_->send_ (buf, s);
353 ssize_t Socket::recv (void* buf, size_t s)
355 return impl_->recv_ (buf, s, 0, 0);
358 ssize_t Socket::recv (void* buf, size_t s, ACE_INET_Addr& from)
360 return impl_->recv_ (buf, s, 0, &from);
363 ssize_t Socket::recv (void* buf, size_t s, ACE_Time_Value const& timeout)
365 return impl_->recv_ (buf, s, &timeout, 0);
368 ssize_t Socket::recv (void* buf,
369 size_t s,
370 ACE_Time_Value const& timeout,
371 ACE_INET_Addr& from)
373 return impl_->recv_ (buf, s, &timeout, &from);
376 ssize_t Socket::
377 size ()
379 return impl_->size_ (0);
382 ssize_t Socket::
383 size (ACE_Time_Value const& timeout)
385 return impl_->size_ (&timeout);
388 ACE_HANDLE Socket::
389 get_handle ()
391 return impl_->get_handle_ ();