Cleanup ACE_HAS_PTHREAD_SIGMASK_PROTOTYPE, all platforms support it so far as I can...
[ACE_TAO.git] / ACE / protocols / ace / RMCast / Link.cpp
blobb15cae36a874a1b512c154d58b2aec0c5c0ccc8a
1 // author : Boris Kolpackov <boris@kolpackov.net>
2 #include "ace/Time_Value.h" // ACE_Time_Value
3 #include "ace/OS_NS_stdio.h"
4 #include "ace/OS_NS_stdlib.h"
5 #include "ace/OS_NS_time.h"
6 #include "ace/OS_NS_sys_socket.h"
8 #include "Link.h"
10 namespace ACE_RMCast
12 Link::
13 ~Link ()
15 ssock_.close ();
16 rsock_.close ();
19 Link::
20 Link (Address const& addr, Parameters const& params)
21 : params_ (params),
22 addr_ (addr),
23 ssock_ (Address (static_cast<unsigned short> (0),
24 static_cast<ACE_UINT32> (INADDR_ANY)),
25 AF_INET,
26 IPPROTO_UDP,
27 1),
28 stop_ (false)
31 ACE_OS::srand ((unsigned int) ACE_OS::time (0));
34 rsock_.set_option (IP_MULTICAST_LOOP, 0);
35 // rsock_.set_option (IP_MULTICAST_TTL, 0);
37 // Set recv/send buffers.
40 int r (131070);
41 int s (sizeof (r));
43 static_cast<ACE_SOCK&> (rsock_).set_option (
44 SOL_SOCKET, SO_RCVBUF, &r, s);
46 static_cast<ACE_SOCK&> (ssock_).set_option (
47 SOL_SOCKET, SO_RCVBUF, &r, s);
49 rsock_.get_option (SOL_SOCKET, SO_RCVBUF, &r, &s);
50 //cerr << 5 << "recv buffer size: " << r << endl;
52 ssock_.get_option (SOL_SOCKET, SO_RCVBUF, &r, &s);
53 //cerr << 5 << "send buffer size: " << r << endl;
56 // Bind address and port.
58 if (ACE_OS::connect (ssock_.get_handle (),
59 reinterpret_cast<sockaddr*> (addr_.get_addr ()),
60 addr_.get_addr_size ()) == -1)
62 ACE_OS::perror ("connect: ");
63 ACE_OS::abort ();
67 ssock_.get_local_addr (self_);
69 //cerr << 5 << "self: " << self_ << endl;
72 void Link::
73 in_start (In_Element* in)
75 Element::in_start (in);
77 rsock_.join (addr_);
79 // Start receiving thread.
81 recv_mgr_.spawn (recv_thunk, this);
84 void Link::
85 out_start (Out_Element* out)
87 Element::out_start (out);
90 void Link::
91 in_stop ()
93 // Stop receiving thread.
96 Lock l (mutex_);
97 stop_ = true;
99 recv_mgr_.wait ();
101 Element::in_stop ();
104 void Link::send (Message_ptr m)
106 // Simulate message loss and reordering.
108 if (params_.simulator ())
110 if ((ACE_OS::rand () % 17) != 0)
112 Lock l (mutex_);
114 if (hold_.get ())
116 send_ (m);
117 send_ (hold_);
118 hold_ = Message_ptr (0);
120 else
122 if ((ACE_OS::rand () % 17) != 0)
124 send_ (m);
126 else
128 hold_ = m;
130 // Make a copy in M so that the reliable loop below
131 // won't add FROM and TO to HOLD_.
133 m = hold_->clone ();
138 else
139 send_ (m);
141 // Reliable loop.
143 m->add (Profile_ptr (new From (self_)));
144 m->add (Profile_ptr (new To (self_)));
146 in_->recv (m);
149 void Link::
150 send_ (Message_ptr m)
152 ostream os (m->size (), 1); // Always little-endian.
154 os << *m;
156 if (os.length () > size_t (params_.max_packet_size ()))
158 ACE_ERROR ((LM_ERROR,
159 "packet length (%d) exceeds max_poacket_size (%d)\n",
160 os.length (), params_.max_packet_size ()));
162 for (Message::ProfileIterator i (m->begin ()); !i.done (); i.advance ())
164 ACE_ERROR ((LM_ERROR,
165 "profile id: %d; size: %d\n",
166 (*i).ext_id_, (*i).int_id_->size ()));
169 ACE_OS::abort ();
172 ssock_.send (os.buffer (), os.length (), addr_);
175 if (m->find (nrtm::id))
177 ACE_OS::write (1, os.buffer (), os.length ());
178 ACE_OS::exit (1);
183 void Link::recv ()
185 size_t max_packet_size (params_.max_packet_size ());
187 std::unique_ptr<char[]> holder (new char[max_packet_size + ACE_CDR::MAX_ALIGNMENT]);
189 char* data = ACE_ptr_align_binary (holder.get (), ACE_CDR::MAX_ALIGNMENT);
191 size_t size (0);
193 while (true)
195 //@@ Should I lock here?
198 Address addr;
200 // Block for up to one tick waiting for an incomming message.
202 for (;;)
204 ACE_Time_Value t (params_.tick ());
205 ssize_t r = rsock_.recv (data, 4, addr, MSG_PEEK, &t);
208 // Check for cancellation request.
211 Lock l (mutex_);
212 if (stop_)
213 return;
216 if (r == -1)
218 if (errno != ETIME)
219 ACE_OS::abort ();
221 else
223 size = static_cast<size_t> (r);
224 break;
229 if (size != 4 || addr == self_)
231 // Discard bad messages and ones from ourselvs since
232 // we are using reliable loopback.
234 rsock_.recv (data, 0, addr);
235 continue;
238 u32 msg_size;
240 istream is (data, size, 1); // Always little-endian.
241 is >> msg_size;
244 if (msg_size <= 4 || msg_size > max_packet_size)
246 // Bad message.
248 rsock_.recv (data, 0, addr);
249 continue;
252 size = rsock_.recv (data, max_packet_size, addr);
254 if (msg_size != size)
256 // Bad message.
258 continue;
261 //cerr << 6 << "from: " << addr << endl;
263 Message_ptr m (new Message ());
265 m->add (Profile_ptr (new From (addr)));
266 m->add (Profile_ptr (new To (self_)));
268 istream is (data, size, 1); // Always little-endian.
270 is >> msg_size;
272 while (true)
274 u16 id, size;
276 if (!((is >> id) && (is >> size))) break;
278 //cerr << 6 << "reading profile with id " << id << " "
279 // << size << " bytes long" << endl;
281 Profile::Header hdr (id, size);
283 if (id == SN::id)
285 m->add (Profile_ptr (new SN (hdr, is)));
287 else if (id == Data::id)
289 m->add (Profile_ptr (new Data (hdr, is)));
291 else if (id == NAK::id)
293 m->add (Profile_ptr (new NAK (hdr, is)));
295 else if (id == NRTM::id)
297 m->add (Profile_ptr (new NRTM (hdr, is)));
299 else if (id == NoData::id)
301 m->add (Profile_ptr (new NoData (hdr, is)));
303 else if (id == Part::id)
305 m->add (Profile_ptr (new Part (hdr, is)));
307 else
309 //cerr << 0 << "unknown profile id " << hdr.id () << endl;
310 ACE_OS::abort ();
314 in_->recv (m);
318 ACE_THR_FUNC_RETURN Link::
319 recv_thunk (void* obj)
321 reinterpret_cast<Link*> (obj)->recv ();
322 return 0;
325 void Link::recv (Message_ptr)
327 ACE_OS::abort ();