1 /*---------------------------------------------------------------------------*\
5 * Copyright (C) 2000-2002 by the OpenSG Forum *
9 * contact: dirk@opensg.org, gerrit.voss@vossg.org, jbehr@zgdv.de *
11 \*---------------------------------------------------------------------------*/
12 /*---------------------------------------------------------------------------*\
15 * This library is free software; you can redistribute it and/or modify it *
16 * under the terms of the GNU Library General Public License as published *
17 * by the Free Software Foundation, version 2. *
19 * This library is distributed in the hope that it will be useful, but *
20 * WITHOUT ANY WARRANTY; without even the implied warranty of *
21 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
22 * Library General Public License for more details. *
24 * You should have received a copy of the GNU Library General Public *
25 * License along with this library; if not, write to the Free Software *
26 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. *
28 \*---------------------------------------------------------------------------*/
29 /*---------------------------------------------------------------------------*\
37 \*---------------------------------------------------------------------------*/
39 //---------------------------------------------------------------------------
41 //---------------------------------------------------------------------------
49 #include "OSGConfig.h"
51 #include "OSGBaseThread.h"
52 #include "OSGSocketSelection.h"
53 #include "OSGBinaryMessage.h"
54 #include "OSGGroupMCastConnection.h"
55 #include "OSGConnectionType.h"
57 #define USE_EARLY_SEND
61 /** \class OSG::GroupMCastConnection
64 /*-------------------------------------------------------------------------*/
65 /* constructor destructor */
70 GroupMCastConnection::GroupMCastConnection():
74 _sendQueueThread (NULL
),
75 _sendQueueThreadRunning(false),
76 _sendQueueThreadStop (false),
88 sprintf(lockName
,"GroupMCastConnection%p", static_cast<void *>(this));
91 _lock
= Lock::get(lockName
, false);
93 for(UInt32 dI
= 0 ; dI
< OSG_DGRAM_QUEUE_LEN
; ++dI
)
94 _free
.put(new Dgram());
95 // prepare mcast socket
98 _mcastSocket
.setReadBufferSize(262144);
100 _windowSize
= _mcastSocket
.getReadBufferSize()/(OSG_DGRAM_LEN
) - 1;
101 _windowSize
= osgMax(UInt32(2),_windowSize
);
106 GroupMCastConnection::~GroupMCastConnection(void)
109 _sendQueueThreadStop
= true;
113 dgram
= _free
.get(_lock
);
120 BaseThread::join(_sendQueueThread
);
122 _sendQueueThread
= NULL
;
125 _mcastSocket
.close();
128 while(!_free
.empty())
129 delete _free
.get(_lock
);
130 while(!_queue
.empty())
131 delete _queue
.get(_lock
);
137 /*! get connection type
139 const ConnectionType
*GroupMCastConnection::getType()
144 /*-------------------------------------------------------------------------*/
147 /*! connect to the given point. If timeout is reached, -1 is
150 GroupConnection::Channel
GroupMCastConnection::connectPoint(
151 const std::string
&address
,
154 Channel channel
= Inherited::connectPoint(address
,timeout
);
158 /*! disconnect the given channel
160 void GroupMCastConnection::disconnect(Channel channel
)
162 ChannelIndex index
= channelToIndex(channel
);
164 Inherited::disconnect(channel
);
167 // remove channel from _receiver/_waitFor
168 std::vector
<SocketAddress
>::iterator rIt
= _receiver
.begin() + index
;
169 std::vector
<SocketAddress
>::iterator wIt
= std::find(_waitFor
.begin(),
173 if(wIt
!= _waitFor
.end())
176 _receiver
.erase(rIt
);
181 /*! accept an icomming point connection. If timeout is reached,
182 -1 is returned. If timeout is -1 then wait without timeout
184 GroupConnection::Channel
GroupMCastConnection::acceptPoint(Time timeout
)
186 Connection::Channel channel
= Inherited::acceptPoint(timeout
);
190 /*! parse the params string.
192 void GroupMCastConnection::setParams(const std::string
¶ms
)
197 Inherited::setParams(params
);
199 std::string option
= "TTL=";
200 std::string::size_type i
= 0;
201 if((i
=params
.find(option
)) != std::string::npos
)
203 std::string str
= params
.substr(i
+ option
.size());
205 std::stringstream ss
;
206 std::string::size_type j
= 0;
207 while(j
< str
.length() && str
[j
] != ',' && isdigit(str
[j
]))
215 _mcastSocket
.setTTL(static_cast<unsigned char>(ttl
));
216 FINFO(("GroupMCastConnection::setParams : setting ttl to %u.\n", ttl
));
220 /*-------------------------------------------------------------------------*/
225 bool GroupMCastConnection::wait(Time timeout
) OSG_THROW (ReadError
)
228 return Inherited::wait(timeout
);
233 void GroupMCastConnection::signal(void) OSG_THROW (WriteError
)
240 /*-------------------------- helpers --------------------------------------*/
242 /** \brief create conneciton
245 GroupConnection
*GroupMCastConnection::create(void)
247 return new GroupMCastConnection();
250 /*-------------------------------------------------------------------------*/
253 /** Write data to all destinations
255 * \param mem Pointer to data buffer
256 * \param size Size of bytes to write
260 void GroupMCastConnection::write(MemoryHandle mem
,UInt32 size
)
262 Dgram
*dgram
= NULL
;;
264 char *buffer
= reinterpret_cast<char*>(mem
);
273 dgram
= _free
.get(_lock
);
276 dgram
->setSize(osgMin(size
,dgram
->getCapacity()));
277 memcpy(dgram
->getData(),buffer
+pos
,dgram
->getSize());
278 // set sequence number
279 dgram
->setId( _seqNumber
++ );
280 // prepate next block
281 size
-= dgram
->getSize();
282 pos
+= dgram
->getSize();
283 // put to write queue
285 if(!_sendQueueThreadRunning
)
288 BaseThread::join(_sendQueueThread
);
289 throw WriteError("Channel closed");
291 #ifdef USE_EARLY_SEND
294 dgram
->setEarlySend(true);
295 _mcastSocket
.sendTo(dgram
->getBuffer(),
296 dgram
->getBufferSize(),
307 * Write blocksize and data.
310 void GroupMCastConnection::writeBuffer(void)
312 UInt32 size
= writeBufBegin()->getDataSize();
313 MemoryHandle buffer
= writeBufBegin()->getMem();
318 /*-------------------------------------------------------------------------*/
319 /* private helpers */
321 /*! check if all receivers are alive
323 bool GroupMCastConnection::checkChannels(void)
325 SocketSelection selection
;
330 for(index
= 0 ; index
< _sockets
.size() ; ++index
)
331 selection
.setRead(_sockets
[index
]);
332 if(selection
.select(0))
335 for(index
= 0 ; index
< _sockets
.size() ; ++index
)
337 if(selection
.isSetRead(_sockets
[index
]))
341 _sockets
[index
].send(&buffer
,1);
343 catch(SocketException
&e
)
346 FWARNING(("Socket error:%s\n",e
.what()))
356 /*! Send current write queue
358 bool GroupMCastConnection::sendQueue(void)
360 std::vector
<Dgram
*> dgram
;
361 std::vector
<std::set
<SocketAddress
> > missing
;
363 UInt32 maxCount
= _windowSize
-1;
370 bool readable
= false;
373 SocketAddress fromAddress
;
375 const Time ackTimeout
= 0.01;
379 bool stopAfterSend
=false;
382 dgram
.resize(_windowSize
);
383 missing
.resize(_windowSize
);
385 #ifdef TEST_LOST_DGRAM_RATE
386 srand48((int)getSystemTime());
394 while(count
< maxCount
&&
395 (count
== 0 || !_queue
.empty()))
397 dgram
[end
] = _queue
.get(_lock
);
399 if(!dgram
[end
]->getSize())
403 // wait for ack of packages in window
404 stopAfterSend
= true;
409 // no packages in the window
414 // insert to expected responses
415 for(index
=0 ; index
<_waitFor
.size() ; ++index
)
416 missing
[end
].insert(_waitFor
[index
]);
417 end
= (end
+1) % _windowSize
;
419 lastAckTime
= getSystemTime();
425 // send all dgrams in current window
426 for( ; send
!= end
; send
= (send
+1) % _windowSize
)
428 #ifdef TEST_LOST_DGRAM_RATE
429 if(drand48()>TEST_LOST_DGRAM_RATE
)
432 if(!dgram
[send
]->getEarlySend())
434 _mcastSocket
.sendTo(dgram
[send
]->getBuffer(),
435 dgram
[send
]->getBufferSize(),
439 dgram
[send
]->setEarlySend(false);
441 sendId
= dgram
[send
]->getId();
444 // FLOG(("Sending dgram %d at id %d\n", send, dgram[send]->getId()));
448 // window is full and nothing to send or
450 // or there is something to read
452 ( ( readable
= _mcastSocket
.waitReadable(0) ) ||
453 ( count
== maxCount
&& send
== end
) ||
454 ( getSystemTime() - lastAckTime
> ackTimeout
) ) )
456 if(!readable
&& !_mcastSocket
.waitReadable(ackTimeout
))
459 printf("count %d\n",count
);
460 printf("missing %d\n",missing
[ack
].size());
462 printf("readable %d\n",readable
);
463 printf("send %d end %d\n",send
,end
);
464 printf("lastack %lf\n",getSystemTime() - lastAckTime
);
466 FDEBUG(("timeout count %d %d missing %" PRISize
"\n",
467 count
,sendId
,missing
[ack
].size()))
468 // printf("%.10f timeout count %d %d missing %d\n",getSystemTime()-t1,count,sendId,missing[ack].size());
470 ackRequest
.setSize(0);
471 ackRequest
.setId(sendId
);
473 // send request over multicast
474 _mcastSocket
.sendTo(ackRequest
.getBuffer(),
475 ackRequest
.getBufferSize(),
478 // wait until next ack request
479 _mcastSocket
.waitReadable(0.05);
482 if(getSystemTime() - lastAckTime
> 0.5)
487 if(_sendQueueThreadStop
&&
488 getSystemTime() - lastAckTime
> 1)
489 // linger max 1 sec after close
498 len
= _mcastSocket
.recvFrom(response
.getBuffer(),
499 response
.getBufferCapacity(),
502 len
= _mcastSocket
.recvFrom(response
,
504 lastAckTime
= getSystemTime();
506 // ignore response with wrong len
507 response
.setResponseSize();
508 if(len
!= response
.getBufferSize())
510 FDEBUG(("Wrong response len %d\n",len
))
515 if(!Dgram::less(response
.getId(),dgram
[ack
]->getId()))
517 // first ack for this dgram from this receiver
518 if(response
.getResponseAck() == true)
520 // FLOG(("Ack %d from %s:%d\n",
522 // fromAddress.getHost().c_str(),
523 // fromAddress.getPort() ));
526 dgram
[m
]->getId() != response
.getId() ;
527 m
=(m
+1) % _windowSize
)
528 missing
[m
].erase(fromAddress
);
529 missing
[m
].erase(fromAddress
);
533 if(response
.getId() == lastNak
&&
534 getSystemTime() - lastNakTime
< 0.02)
536 lastNak
= response
.getId();
537 lastNakTime
= getSystemTime();
539 // FLOG(("Nack %d from %s:%d\n",
541 // fromAddress.getHost().c_str(),
542 // fromAddress.getPort() ));
547 while(m
!= send
&& dgram
[m
]->getId() != response
.getId())
549 m
= (m
+ 1) % _windowSize
;
556 // free acknolaged packes
557 if(missing
[ack
].empty())
559 // printf("ack %d\n",dgram[ack]->getId());
561 while(count
&& missing
[ack
].empty())
563 _free
.put(dgram
[ack
]);
564 ack
= (ack
+1) % _windowSize
;
571 while(!stopAfterSend
|| send
!= end
);
578 void GroupMCastConnection::sendQueueThread(void *arg
)
580 GroupMCastConnection
*the
= static_cast<GroupMCastConnection
*>(arg
);
585 catch(SocketException
&e
)
587 FFATAL(( "Writer Proc crashed %s\n",e
.what() ));
589 the
->_sendQueueThreadRunning
= false;
592 /*! initialize connection. Connect to all points
594 void GroupMCastConnection::initialize()
596 std::string group
= "239.33.42.32";
597 // std::string group = "146.140.32.7";
598 // std::string group = "146.140.32.255";
600 SizeT pos
= _destination
.find(':');
602 std::string clientHost
;
605 UInt32 ackNum
= UInt32(osgSqrt(Real32(_sockets
.size())));
608 BinaryMessage message
;
609 char threadName
[256];
611 sprintf(threadName
,"GroupMCastConnection%p", static_cast<void *>(this));
613 if(!getDestination().empty())
614 group
= getDestination();
616 if(_sockets
.size()<=16)
619 if(pos
!= std::string::npos
)
621 group
= std::string(_destination
,0,pos
);
622 port
= atoi(std::string(_destination
,
624 std::string::npos
).c_str());
628 if(_destination
.size())
629 group
= _destination
;
631 _mcastAddress
.setHost(group
);
632 _mcastAddress
.setPort(port
);
634 // set multicast interface, if given
635 if(!getInterface().empty())
636 _mcastSocket
.setMCastInterface(
637 SocketAddress(getInterface().c_str()));
639 for(index
= 0 ; index
< _sockets
.size() ; ++index
)
642 // tell the point connection the multicast address
643 message
.putString(_mcastAddress
.getHost());
644 message
.putUInt32(_mcastAddress
.getPort());
645 // tell the current seq number
646 message
.putUInt32(_seqNumber
);
647 // tell the point from wich port requests are comming
648 message
.putUInt32(_mcastSocket
.getAddress().getPort());
650 _sockets
[index
].send(message
);
652 // receive destination address
654 len
= _sockets
[index
].recv(message
);
656 throw ReadError("Channel closed\n");
657 clientHost
= _remoteAddresses
[index
].getHost();
658 clientPort
= message
.getUInt32();
660 SINFO
<< "Server:" << clientHost
661 << " Port:" << clientPort
<< std::endl
;
662 _receiver
.push_back(SocketAddress(clientHost
.c_str(),clientPort
));
664 for(index
= 0 ; index
< _sockets
.size() ; ++index
)
667 // tell receivers, whom to report acks
668 if((index
% ackNum
) == 0)
670 _waitFor
.push_back(_receiver
[index
]);
671 numSource
= osgMin( ackNum
-1, UInt32(_sockets
.size()-index
-1) );
672 message
.putUInt32(numSource
);
673 for(UInt32 r
= index
+1 ; r
< index
+1+numSource
; ++r
)
675 message
.putString(_receiver
[r
].getHost());
676 message
.putUInt32(_receiver
[r
].getPort());
678 message
.putString("");
679 message
.putUInt32(_mcastSocket
.getAddress().getPort());
683 sendTo
= index
- (index
% ackNum
);
684 message
.putUInt32(0);
685 message
.putString(_receiver
[sendTo
].getHost());
686 message
.putUInt32(_receiver
[sendTo
].getPort());
689 _sockets
[index
].send(message
);
692 // start write thread
693 _sendQueueThread
=BaseThread::get(threadName
, false);
694 _sendQueueThreadRunning
= true;
695 _sendQueueThreadStop
= false;
696 _sendQueueThread
->runFunction( sendQueueThread
, static_cast<void *>(this) );
700 /*-------------------------------------------------------------------------*/
703 ConnectionType
GroupMCastConnection::_type(
704 &GroupMCastConnection::create
,