1 // author : Boris Kolpackov <boris@kolpackov.net>
2 #include "ace/Time_Value.h" // ACE_Time_Value
3 #include "ace/OS_NS_stdio.h"
4 #include "ace/OS_NS_stdlib.h"
5 #include "ace/OS_NS_time.h"
6 #include "ace/OS_NS_sys_socket.h"
20 Link (Address
const& addr
, Parameters
const& params
)
23 ssock_ (Address (static_cast<unsigned short> (0),
24 static_cast<ACE_UINT32
> (INADDR_ANY
)),
31 ACE_OS::srand ((unsigned int) ACE_OS::time (0));
34 rsock_
.set_option (IP_MULTICAST_LOOP
, 0);
35 // rsock_.set_option (IP_MULTICAST_TTL, 0);
37 // Set recv/send buffers.
43 static_cast<ACE_SOCK
&> (rsock_
).set_option (
44 SOL_SOCKET
, SO_RCVBUF
, &r
, s
);
46 static_cast<ACE_SOCK
&> (ssock_
).set_option (
47 SOL_SOCKET
, SO_RCVBUF
, &r
, s
);
49 rsock_
.get_option (SOL_SOCKET
, SO_RCVBUF
, &r
, &s
);
50 //cerr << 5 << "recv buffer size: " << r << endl;
52 ssock_
.get_option (SOL_SOCKET
, SO_RCVBUF
, &r
, &s
);
53 //cerr << 5 << "send buffer size: " << r << endl;
56 // Bind address and port.
58 if (ACE_OS::connect (ssock_
.get_handle (),
59 reinterpret_cast<sockaddr
*> (addr_
.get_addr ()),
60 addr_
.get_addr_size ()) == -1)
62 ACE_OS::perror ("connect: ");
67 ssock_
.get_local_addr (self_
);
69 //cerr << 5 << "self: " << self_ << endl;
73 in_start (In_Element
* in
)
75 Element::in_start (in
);
79 // Start receiving thread.
81 recv_mgr_
.spawn (recv_thunk
, this);
85 out_start (Out_Element
* out
)
87 Element::out_start (out
);
93 // Stop receiving thread.
104 void Link::send (Message_ptr m
)
106 // Simulate message loss and reordering.
108 if (params_
.simulator ())
110 if ((ACE_OS::rand () % 17) != 0)
118 hold_
= Message_ptr (0);
122 if ((ACE_OS::rand () % 17) != 0)
130 // Make a copy in M so that the reliable loop below
131 // won't add FROM and TO to HOLD_.
143 m
->add (Profile_ptr (new From (self_
)));
144 m
->add (Profile_ptr (new To (self_
)));
150 send_ (Message_ptr m
)
152 ostream
os (m
->size (), 1); // Always little-endian.
156 if (os
.length () > size_t (params_
.max_packet_size ()))
158 ACE_ERROR ((LM_ERROR
,
159 "packet length (%d) exceeds max_poacket_size (%d)\n",
160 os
.length (), params_
.max_packet_size ()));
162 for (Message::ProfileIterator
i (m
->begin ()); !i
.done (); i
.advance ())
164 ACE_ERROR ((LM_ERROR
,
165 "profile id: %d; size: %d\n",
166 (*i
).ext_id_
, (*i
).int_id_
->size ()));
172 ssock_
.send (os
.buffer (), os
.length (), addr_
);
175 if (m->find (nrtm::id))
177 ACE_OS::write (1, os.buffer (), os.length ());
185 size_t max_packet_size (params_
.max_packet_size ());
187 std::unique_ptr
<char[]> holder (new char[max_packet_size
+ ACE_CDR::MAX_ALIGNMENT
]);
189 char* data
= ACE_ptr_align_binary (holder
.get (), ACE_CDR::MAX_ALIGNMENT
);
195 //@@ Should I lock here?
200 // Block for up to one tick waiting for an incomming message.
204 ACE_Time_Value
t (params_
.tick ());
205 ssize_t r
= rsock_
.recv (data
, 4, addr
, MSG_PEEK
, &t
);
208 // Check for cancellation request.
223 size
= static_cast<size_t> (r
);
229 if (size
!= 4 || addr
== self_
)
231 // Discard bad messages and ones from ourselvs since
232 // we are using reliable loopback.
234 rsock_
.recv (data
, 0, addr
);
240 istream
is (data
, size
, 1); // Always little-endian.
244 if (msg_size
<= 4 || msg_size
> max_packet_size
)
248 rsock_
.recv (data
, 0, addr
);
252 size
= rsock_
.recv (data
, max_packet_size
, addr
);
254 if (msg_size
!= size
)
261 //cerr << 6 << "from: " << addr << endl;
263 Message_ptr
m (new Message ());
265 m
->add (Profile_ptr (new From (addr
)));
266 m
->add (Profile_ptr (new To (self_
)));
268 istream
is (data
, size
, 1); // Always little-endian.
276 if (!((is
>> id
) && (is
>> size
))) break;
278 //cerr << 6 << "reading profile with id " << id << " "
279 // << size << " bytes long" << endl;
281 Profile::Header
hdr (id
, size
);
285 m
->add (Profile_ptr (new SN (hdr
, is
)));
287 else if (id
== Data::id
)
289 m
->add (Profile_ptr (new Data (hdr
, is
)));
291 else if (id
== NAK::id
)
293 m
->add (Profile_ptr (new NAK (hdr
, is
)));
295 else if (id
== NRTM::id
)
297 m
->add (Profile_ptr (new NRTM (hdr
, is
)));
299 else if (id
== NoData::id
)
301 m
->add (Profile_ptr (new NoData (hdr
, is
)));
303 else if (id
== Part::id
)
305 m
->add (Profile_ptr (new Part (hdr
, is
)));
309 //cerr << 0 << "unknown profile id " << hdr.id () << endl;
318 ACE_THR_FUNC_RETURN
Link::
319 recv_thunk (void* obj
)
321 reinterpret_cast<Link
*> (obj
)->recv ();
325 void Link::recv (Message_ptr
)