1 // author : Boris Kolpackov <boris@kolpackov.net>
2 #include "ace/OS_Memory.h"
3 #include "ace/OS_NS_stdio.h"
4 #include "ace/OS_NS_stdlib.h"
5 #include "ace/OS_NS_string.h"
6 #include "ace/OS_NS_unistd.h"
7 #include "ace/OS_NS_sys_time.h" // gettimeofday
9 #include "ace/Unbounded_Queue.h"
18 #include "Reassemble.h"
19 #include "Acknowledge.h"
20 #include "Retransmit.h"
34 class Socket_Impl
: protected Element
39 Socket_Impl (Address
const& a
, bool loop
, Parameters
const& params
);
43 send_ (void const* buf
, size_t s
);
48 ACE_Time_Value
const* timeout
,
52 size_ (ACE_Time_Value
const* timeout
);
58 //FUZZ: disable check_for_lack_ACE_OS
59 virtual void recv (Message_ptr m
);
60 //FUZZ: enable check_for_lack_ACE_OS
64 Parameters
const params_
;
69 ACE_Unbounded_Queue
<Message_ptr
> queue_
;
71 ACE_Pipe signal_pipe_
;
73 std::unique_ptr
<Fragment
> fragment_
;
74 std::unique_ptr
<Reassemble
> reassemble_
;
75 std::unique_ptr
<Acknowledge
> acknowledge_
;
76 std::unique_ptr
<Retransmit
> retransmit_
;
77 std::unique_ptr
<Flow
> flow_
;
78 std::unique_ptr
<Link
> link_
;
83 Socket_Impl (Address
const& a
, bool loop
, Parameters
const& params
)
88 fragment_
.reset (new Fragment (params_
));
89 reassemble_
.reset (new Reassemble (params_
));
90 acknowledge_
.reset (new Acknowledge (params_
));
91 retransmit_
.reset (new Retransmit (params_
));
92 flow_
.reset (new Flow (params_
));
93 link_
.reset (new Link (a
, params_
));
95 // Start IN stack from top to bottom.
98 fragment_
->in_start (this);
99 reassemble_
->in_start (fragment_
.get ());
100 acknowledge_
->in_start (reassemble_
.get ());
101 retransmit_
->in_start (acknowledge_
.get ());
102 flow_
->in_start (retransmit_
.get ());
103 link_
->in_start (flow_
.get ());
105 // Start OUT stack from bottom up.
107 link_
->out_start (0);
108 flow_
->out_start (link_
.get ());
109 retransmit_
->out_start (flow_
.get ());
110 acknowledge_
->out_start (retransmit_
.get ());
111 reassemble_
->out_start (acknowledge_
.get ());
112 fragment_
->out_start (reassemble_
.get ());
113 out_start (fragment_
.get ());
119 // Stop OUT stack from top to bottom.
122 fragment_
->out_stop ();
123 reassemble_
->out_stop ();
124 acknowledge_
->out_stop ();
125 retransmit_
->out_stop ();
129 // Stop IN stack from bottom up.
133 retransmit_
->in_stop ();
134 acknowledge_
->in_stop ();
135 reassemble_
->in_stop ();
136 fragment_
->in_stop ();
139 // Close signal pipe.
141 if (signal_pipe_
.read_handle () != ACE_INVALID_HANDLE
)
142 signal_pipe_
.close ();
147 send_ (void const* buf
, size_t s
)
149 Message_ptr
m (new Message
);
151 m
->add (Profile_ptr (new Data (buf
, s
)));
153 // Qualification is for VC6 and VxWorks.
158 ssize_t
Socket_Impl::
161 ACE_Time_Value
const* timeout
,
164 ACE_Time_Value abs_time
;
167 abs_time
= ACE_OS::gettimeofday () + *timeout
;
171 while (queue_
.is_empty ())
175 if (cond_
.wait (&abs_time
) != -1)
180 if (cond_
.wait () != -1)
184 return -1; // errno is already set
190 if (queue_
.dequeue_head (m
) == -1)
194 if (queue_
.is_empty ())
196 // Remove data from the pipe.
198 if (signal_pipe_
.read_handle () != ACE_INVALID_HANDLE
)
202 if (signal_pipe_
.recv (&c
, 1) != 1)
204 ACE_OS::perror ("read: ");
211 *from
= static_cast<From
const*> (m
->find (From::id
))->address ();
213 if (m
->find (NoData::id
) != 0)
219 Data
const* d
= static_cast<Data
const*>(m
->find (Data::id
));
221 ssize_t
r (static_cast<ssize_t
> (d
->size () < s
? d
->size () : s
));
223 ACE_OS::memcpy (buf
, d
->buf (), r
);
228 ssize_t
Socket_Impl::
229 size_ (ACE_Time_Value
const* timeout
)
231 ACE_Time_Value abs_time
;
234 abs_time
= ACE_OS::gettimeofday () + *timeout
;
238 while (queue_
.is_empty ())
242 if (cond_
.wait (&abs_time
) != -1)
247 if (cond_
.wait () != -1)
251 return -1; // errno is already set
254 // I can't get the head of the queue without actually dequeuing
259 if (queue_
.dequeue_head (m
) == -1)
262 if (queue_
.enqueue_head (m
) == -1)
265 if (m
->find (NoData::id
) != 0)
271 Data
const* d
= static_cast<Data
const*>(m
->find (Data::id
));
273 return static_cast<ssize_t
> (d
->size ());
276 ACE_HANDLE
Socket_Impl::
279 if (signal_pipe_
.read_handle () == ACE_INVALID_HANDLE
)
281 signal_pipe_
.open ();
284 return signal_pipe_
.read_handle ();
288 void Socket_Impl::recv (Message_ptr m
)
290 if (m
->find (Data::id
) != 0 || m
->find (NoData::id
) != 0)
294 Address
to (static_cast<To
const*> (m
->find (To::id
))->address ());
297 static_cast<From
const*> (m
->find (From::id
))->address ());
305 //if (queue_.size () != 0)
306 // cerr << "recv socket queue size: " << queue_.size () << endl;
308 //FUZZ: disable check_for_lack_ACE_OS
309 bool signal (queue_
.is_empty ());
310 //FUZZ: enable check_for_lack_ACE_OS
312 queue_
.enqueue_tail (m
);
316 // Also write to the pipe.
317 if (signal_pipe_
.write_handle () != ACE_INVALID_HANDLE
)
321 if (signal_pipe_
.send (&c
, 1) != 1)
323 // perror ("write: ");
343 Socket (Address
const& a
, bool loop
, Parameters
const& params
)
344 : impl_ (new Socket_Impl (a
, loop
, params
))
348 void Socket::send (void const* buf
, size_t s
)
350 impl_
->send_ (buf
, s
);
353 ssize_t
Socket::recv (void* buf
, size_t s
)
355 return impl_
->recv_ (buf
, s
, 0, 0);
358 ssize_t
Socket::recv (void* buf
, size_t s
, ACE_INET_Addr
& from
)
360 return impl_
->recv_ (buf
, s
, 0, &from
);
363 ssize_t
Socket::recv (void* buf
, size_t s
, ACE_Time_Value
const& timeout
)
365 return impl_
->recv_ (buf
, s
, &timeout
, 0);
368 ssize_t
Socket::recv (void* buf
,
370 ACE_Time_Value
const& timeout
,
373 return impl_
->recv_ (buf
, s
, &timeout
, &from
);
379 return impl_
->size_ (0);
383 size (ACE_Time_Value
const& timeout
)
385 return impl_
->size_ (&timeout
);
391 return impl_
->get_handle_ ();