fixed: gcc8 compile issues
[opensg.git] / Source / Base / Network / Socket / OSGGroupMCastConnection.cpp
bloba3ed6fcb251efda35718849bbbe27115fc35b963
1 /*---------------------------------------------------------------------------*\
2 * OpenSG *
3 * *
4 * *
5 * Copyright (C) 2000-2002 by the OpenSG Forum *
6 * *
7 * www.opensg.org *
8 * *
9 * contact: dirk@opensg.org, gerrit.voss@vossg.org, jbehr@zgdv.de *
10 * *
11 \*---------------------------------------------------------------------------*/
12 /*---------------------------------------------------------------------------*\
13 * License *
14 * *
15 * This library is free software; you can redistribute it and/or modify it *
16 * under the terms of the GNU Library General Public License as published *
17 * by the Free Software Foundation, version 2. *
18 * *
19 * This library is distributed in the hope that it will be useful, but *
20 * WITHOUT ANY WARRANTY; without even the implied warranty of *
21 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
22 * Library General Public License for more details. *
23 * *
24 * You should have received a copy of the GNU Library General Public *
25 * License along with this library; if not, write to the Free Software *
26 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. *
27 * *
28 \*---------------------------------------------------------------------------*/
29 /*---------------------------------------------------------------------------*\
30 * Changes *
31 * *
32 * *
33 * *
34 * *
35 * *
36 * *
37 \*---------------------------------------------------------------------------*/
39 //---------------------------------------------------------------------------
40 // Includes
41 //---------------------------------------------------------------------------
42 #include <cstdlib>
43 #include <cstdio>
44 #include <cassert>
46 #include <set>
47 #include <sstream>
49 #include "OSGConfig.h"
50 #include "OSGLog.h"
51 #include "OSGBaseThread.h"
52 #include "OSGSocketSelection.h"
53 #include "OSGBinaryMessage.h"
54 #include "OSGGroupMCastConnection.h"
55 #include "OSGConnectionType.h"
57 #define USE_EARLY_SEND
59 OSG_USING_NAMESPACE
61 /** \class OSG::GroupMCastConnection
62 **/
64 /*-------------------------------------------------------------------------*/
65 /* constructor destructor */
67 /*! Constructor
70 GroupMCastConnection::GroupMCastConnection():
71 Inherited ( ),
72 _mcastSocket ( ),
73 _mcastAddress ( ),
74 _sendQueueThread (NULL ),
75 _sendQueueThreadRunning(false),
76 _sendQueueThreadStop (false),
77 _queue ( ),
78 _free ( ),
79 _lock (NULL ),
80 _seqNumber (0 ),
81 _receivers (0 ),
82 _windowSize (0 ),
83 _receiver ( ),
84 _waitFor ( ),
85 _initialized (false)
87 char lockName[256];
88 sprintf(lockName,"GroupMCastConnection%p", static_cast<void *>(this));
90 // create locks
91 _lock = Lock::get(lockName, false);
92 // fill dgramqueue
93 for(UInt32 dI = 0 ; dI < OSG_DGRAM_QUEUE_LEN ; ++dI)
94 _free.put(new Dgram());
95 // prepare mcast socket
96 _mcastSocket.open();
97 _mcastSocket.bind();
98 _mcastSocket.setReadBufferSize(262144);
99 // set window size
100 _windowSize = _mcastSocket.getReadBufferSize()/(OSG_DGRAM_LEN) - 1;
101 _windowSize = osgMax(UInt32(2),_windowSize);
104 /*! Destructor
106 GroupMCastConnection::~GroupMCastConnection(void)
108 Dgram *dgram;
109 _sendQueueThreadStop = true;
111 // get free dgram
112 _lock->acquire();
113 dgram = _free.get(_lock);
114 dgram->setSize(0);
115 _queue.put(dgram);
116 _lock->release();
117 // wait for stop
118 if(_sendQueueThread)
120 BaseThread::join(_sendQueueThread);
122 _sendQueueThread = NULL;
124 // close socket
125 _mcastSocket.close();
126 // free queues
127 _lock->acquire();
128 while(!_free.empty())
129 delete _free.get(_lock);
130 while(!_queue.empty())
131 delete _queue.get(_lock);
132 _lock->release();
134 _lock = NULL;
137 /*! get connection type
139 const ConnectionType *GroupMCastConnection::getType()
141 return &_type;
144 /*-------------------------------------------------------------------------*/
145 /* connection */
147 /*! connect to the given point. If timeout is reached, -1 is
148 returned
150 GroupConnection::Channel GroupMCastConnection::connectPoint(
151 const std::string &address,
152 Time timeout)
154 Channel channel = Inherited::connectPoint(address,timeout);
155 return channel;
158 /*! disconnect the given channel
160 void GroupMCastConnection::disconnect(Channel channel)
162 ChannelIndex index = channelToIndex(channel);
164 Inherited::disconnect(channel);
165 _lock->acquire();
167 // remove channel from _receiver/_waitFor
168 std::vector<SocketAddress>::iterator rIt = _receiver.begin() + index;
169 std::vector<SocketAddress>::iterator wIt = std::find(_waitFor.begin(),
170 _waitFor.end (),
171 _receiver[index] );
173 if(wIt != _waitFor.end())
174 _waitFor.erase(wIt);
176 _receiver.erase(rIt);
178 _lock->release();
181 /*! accept an icomming point connection. If timeout is reached,
182 -1 is returned. If timeout is -1 then wait without timeout
184 GroupConnection::Channel GroupMCastConnection::acceptPoint(Time timeout)
186 Connection::Channel channel = Inherited::acceptPoint(timeout);
187 return channel;
190 /*! parse the params string.
192 void GroupMCastConnection::setParams(const std::string &params)
194 if(params.empty())
195 return;
197 Inherited::setParams(params);
199 std::string option = "TTL=";
200 std::string::size_type i = 0;
201 if((i=params.find(option)) != std::string::npos)
203 std::string str = params.substr(i + option.size());
205 std::stringstream ss;
206 std::string::size_type j = 0;
207 while(j < str.length() && str[j] != ',' && isdigit(str[j]))
209 ss << str[j++];
211 UInt32 ttl;
212 ss >> ttl;
213 if(ttl > 255)
214 ttl = 255;
215 _mcastSocket.setTTL(static_cast<unsigned char>(ttl));
216 FINFO(("GroupMCastConnection::setParams : setting ttl to %u.\n", ttl));
220 /*-------------------------------------------------------------------------*/
221 /* sync */
223 /*! wait for signal
225 bool GroupMCastConnection::wait(Time timeout) OSG_THROW (ReadError)
227 // todo
228 return Inherited::wait(timeout);
231 /*! send signal
233 void GroupMCastConnection::signal(void) OSG_THROW (WriteError)
235 UInt32 tag=314156;
236 putValue(tag);
237 flush();
240 /*-------------------------- helpers --------------------------------------*/
242 /** \brief create conneciton
245 GroupConnection *GroupMCastConnection::create(void)
247 return new GroupMCastConnection();
250 /*-------------------------------------------------------------------------*/
251 /* read write */
253 /** Write data to all destinations
255 * \param mem Pointer to data buffer
256 * \param size Size of bytes to write
260 void GroupMCastConnection::write(MemoryHandle mem,UInt32 size)
262 Dgram *dgram = NULL;;
263 UInt32 pos = 0;
264 char *buffer = reinterpret_cast<char*>(mem);
266 if(!_initialized)
267 initialize();
269 while(size)
271 // get free dgram
272 _lock->acquire();
273 dgram = _free.get(_lock);
274 _lock->release();
275 // fill with data
276 dgram->setSize(osgMin(size,dgram->getCapacity()));
277 memcpy(dgram->getData(),buffer+pos,dgram->getSize());
278 // set sequence number
279 dgram->setId( _seqNumber++ );
280 // prepate next block
281 size -= dgram->getSize();
282 pos += dgram->getSize();
283 // put to write queue
284 _lock->acquire();
285 if(!_sendQueueThreadRunning)
287 _lock->release();
288 BaseThread::join(_sendQueueThread);
289 throw WriteError("Channel closed");
291 #ifdef USE_EARLY_SEND
292 if(_queue.waiting())
294 dgram->setEarlySend(true);
295 _mcastSocket.sendTo(dgram->getBuffer(),
296 dgram->getBufferSize(),
297 _mcastAddress);
299 #endif
300 _queue.put(dgram);
301 _lock->release();
305 /** Write buffer
307 * Write blocksize and data.
310 void GroupMCastConnection::writeBuffer(void)
312 UInt32 size = writeBufBegin()->getDataSize();
313 MemoryHandle buffer = writeBufBegin()->getMem();
315 write(buffer,size);
318 /*-------------------------------------------------------------------------*/
319 /* private helpers */
321 /*! check if all receivers are alive
323 bool GroupMCastConnection::checkChannels(void)
325 SocketSelection selection;
326 UInt32 index;
327 bool valid=true;
329 _lock->acquire();
330 for(index = 0 ; index < _sockets.size() ; ++index)
331 selection.setRead(_sockets[index]);
332 if(selection.select(0))
334 char buffer;
335 for(index = 0 ; index < _sockets.size() ; ++index)
337 if(selection.isSetRead(_sockets[index]))
341 _sockets[index].send(&buffer,1);
343 catch(SocketException &e)
345 valid = false;
346 FWARNING(("Socket error:%s\n",e.what()))
347 break;
352 _lock->release();
353 return valid;
356 /*! Send current write queue
358 bool GroupMCastConnection::sendQueue(void)
360 std::vector<Dgram*> dgram;
361 std::vector<std::set<SocketAddress> > missing;
362 UInt32 count = 0;
363 UInt32 maxCount = _windowSize-1;
364 UInt32 ack = 0;
365 UInt32 end = 0;
366 UInt32 send = 0;
367 UInt32 index;
368 Dgram response;
369 UInt32 m;
370 bool readable = false;
371 Time lastAckTime=0;
372 UInt32 len;
373 SocketAddress fromAddress;
374 Dgram ackRequest;
375 const Time ackTimeout = 0.01;
376 UInt16 sendId=0;
377 UInt16 lastNak=0;
378 Time lastNakTime=0;
379 bool stopAfterSend=false;
381 // prepate buffers
382 dgram.resize(_windowSize);
383 missing.resize(_windowSize);
385 #ifdef TEST_LOST_DGRAM_RATE
386 srand48((int)getSystemTime());
387 #endif
390 // read new dgrams
391 if(count < maxCount)
393 _lock->acquire();
394 while(count < maxCount &&
395 (count == 0 || !_queue.empty()))
397 dgram[end] = _queue.get(_lock);
398 // stop
399 if(!dgram[end]->getSize())
401 if(count)
403 // wait for ack of packages in window
404 stopAfterSend = true;
405 break;
407 else
409 // no packages in the window
410 _lock->release();
411 return true;
414 // insert to expected responses
415 for(index=0 ; index<_waitFor.size() ; ++index)
416 missing[end].insert(_waitFor[index]);
417 end = (end+1) % _windowSize;
418 if(!count)
419 lastAckTime = getSystemTime();
420 count++;
422 _lock->release();
425 // send all dgrams in current window
426 for( ; send != end ; send = (send+1) % _windowSize)
428 #ifdef TEST_LOST_DGRAM_RATE
429 if(drand48()>TEST_LOST_DGRAM_RATE)
430 #endif
432 if(!dgram[send]->getEarlySend())
434 _mcastSocket.sendTo(dgram[send]->getBuffer(),
435 dgram[send]->getBufferSize(),
436 _mcastAddress);
439 dgram[send]->setEarlySend(false);
441 sendId = dgram[send]->getId();
444 // FLOG(("Sending dgram %d at id %d\n", send, dgram[send]->getId()));
447 // loop while
448 // window is full and nothing to send or
449 // queue is empty
450 // or there is something to read
451 while(count &&
452 ( ( readable = _mcastSocket.waitReadable(0) ) ||
453 ( count == maxCount && send == end ) ||
454 ( getSystemTime() - lastAckTime > ackTimeout) ) )
456 if(!readable && !_mcastSocket.waitReadable(ackTimeout))
458 #if 0
459 printf("count %d\n",count);
460 printf("missing %d\n",missing[ack].size());
462 printf("readable %d\n",readable);
463 printf("send %d end %d\n",send,end);
464 printf("lastack %lf\n",getSystemTime() - lastAckTime);
465 #endif
466 FDEBUG(("timeout count %d %d missing %" PRISize "\n",
467 count,sendId,missing[ack].size()))
468 // printf("%.10f timeout count %d %d missing %d\n",getSystemTime()-t1,count,sendId,missing[ack].size());
470 ackRequest.setSize(0);
471 ackRequest.setId(sendId);
473 // send request over multicast
474 _mcastSocket.sendTo(ackRequest.getBuffer(),
475 ackRequest.getBufferSize(),
476 _mcastAddress);
478 // wait until next ack request
479 _mcastSocket.waitReadable(0.05);
481 // check channels
482 if(getSystemTime() - lastAckTime > 0.5)
484 if(!checkChannels())
485 return false;
487 if(_sendQueueThreadStop &&
488 getSystemTime() - lastAckTime > 1)
489 // linger max 1 sec after close
490 break;
491 else
492 // retry wait
493 continue;
496 // read response
497 #if 0
498 len = _mcastSocket.recvFrom(response.getBuffer(),
499 response.getBufferCapacity(),
500 fromAddress);
501 #endif
502 len = _mcastSocket.recvFrom(response,
503 fromAddress);
504 lastAckTime = getSystemTime();
506 // ignore response with wrong len
507 response.setResponseSize();
508 if(len != response.getBufferSize())
510 FDEBUG(("Wrong response len %d\n",len))
511 continue;
514 // old ack ?
515 if(!Dgram::less(response.getId(),dgram[ack]->getId()))
517 // first ack for this dgram from this receiver
518 if(response.getResponseAck() == true)
520 // FLOG(("Ack %d from %s:%d\n",
521 // response.getId(),
522 // fromAddress.getHost().c_str(),
523 // fromAddress.getPort() ));
525 for(m = ack ;
526 dgram[m]->getId() != response.getId() ;
527 m=(m+1) % _windowSize)
528 missing[m].erase(fromAddress);
529 missing[m].erase(fromAddress);
531 else
533 if(response.getId() == lastNak &&
534 getSystemTime() - lastNakTime < 0.02)
535 continue;
536 lastNak = response.getId();
537 lastNakTime = getSystemTime();
539 // FLOG(("Nack %d from %s:%d\n",
540 // response.getId(),
541 // fromAddress.getHost().c_str(),
542 // fromAddress.getPort() ));
544 // retransmit
545 m = ack;
547 while(m != send && dgram[m]->getId() != response.getId())
549 m = (m + 1) % _windowSize;
552 send = m;
556 // free acknolaged packes
557 if(missing[ack].empty())
559 // printf("ack %d\n",dgram[ack]->getId());
560 _lock->acquire();
561 while(count && missing[ack].empty())
563 _free.put(dgram[ack]);
564 ack = (ack+1) % _windowSize;
565 count--;
567 _lock->release();
571 while(!stopAfterSend || send != end);
573 return true;
576 /*! write thread
578 void GroupMCastConnection::sendQueueThread(void *arg)
580 GroupMCastConnection *the = static_cast<GroupMCastConnection *>(arg);
583 the->sendQueue();
585 catch(SocketException &e)
587 FFATAL(( "Writer Proc crashed %s\n",e.what() ));
589 the->_sendQueueThreadRunning = false;
592 /*! initialize connection. Connect to all points
594 void GroupMCastConnection::initialize()
596 std::string group = "239.33.42.32";
597 // std::string group = "146.140.32.7";
598 // std::string group = "146.140.32.255";
599 int port = 15356;
600 SizeT pos = _destination.find(':');
601 int clientPort;
602 std::string clientHost;
603 UInt32 index;
604 UInt32 len;
605 UInt32 ackNum = UInt32(osgSqrt(Real32(_sockets.size())));
606 UInt32 numSource;
607 UInt32 sendTo;
608 BinaryMessage message;
609 char threadName[256];
611 sprintf(threadName,"GroupMCastConnection%p", static_cast<void *>(this));
613 if(!getDestination().empty())
614 group = getDestination();
616 if(_sockets.size()<=16)
617 ackNum = 1;
619 if(pos != std::string::npos)
621 group = std::string(_destination,0,pos);
622 port = atoi(std::string(_destination,
623 pos+1,
624 std::string::npos).c_str());
626 else
628 if(_destination.size())
629 group = _destination;
631 _mcastAddress.setHost(group);
632 _mcastAddress.setPort(port );
634 // set multicast interface, if given
635 if(!getInterface().empty())
636 _mcastSocket.setMCastInterface(
637 SocketAddress(getInterface().c_str()));
639 for(index = 0 ; index < _sockets.size() ; ++index)
641 message.clear();
642 // tell the point connection the multicast address
643 message.putString(_mcastAddress.getHost());
644 message.putUInt32(_mcastAddress.getPort());
645 // tell the current seq number
646 message.putUInt32(_seqNumber);
647 // tell the point from wich port requests are comming
648 message.putUInt32(_mcastSocket.getAddress().getPort());
649 // send the message
650 _sockets[index].send(message);
652 // receive destination address
653 message.clear();
654 len = _sockets[index].recv(message);
655 if(len == 0)
656 throw ReadError("Channel closed\n");
657 clientHost = _remoteAddresses[index].getHost();
658 clientPort = message.getUInt32();
660 SINFO << "Server:" << clientHost
661 << " Port:" << clientPort << std::endl;
662 _receiver.push_back(SocketAddress(clientHost.c_str(),clientPort));
664 for(index = 0 ; index < _sockets.size() ; ++index)
666 message.clear();
667 // tell receivers, whom to report acks
668 if((index % ackNum) == 0)
670 _waitFor.push_back(_receiver[index]);
671 numSource = osgMin( ackNum-1, UInt32(_sockets.size()-index-1) );
672 message.putUInt32(numSource);
673 for(UInt32 r = index+1 ; r < index+1+numSource ; ++r)
675 message.putString(_receiver[r].getHost());
676 message.putUInt32(_receiver[r].getPort());
678 message.putString("");
679 message.putUInt32(_mcastSocket.getAddress().getPort());
681 else
683 sendTo = index - (index % ackNum);
684 message.putUInt32(0);
685 message.putString(_receiver[sendTo].getHost());
686 message.putUInt32(_receiver[sendTo].getPort());
688 // send the message
689 _sockets[index].send(message);
692 // start write thread
693 _sendQueueThread=BaseThread::get(threadName, false);
694 _sendQueueThreadRunning = true;
695 _sendQueueThreadStop = false;
696 _sendQueueThread->runFunction( sendQueueThread, static_cast<void *>(this) );
697 _initialized = true;
700 /*-------------------------------------------------------------------------*/
701 /* static type */
703 ConnectionType GroupMCastConnection::_type(
704 &GroupMCastConnection::create,
705 "Multicast");