Merge branch 'main/rendor-staging' into fixes
[ryzomcore.git] / nel / src / net / buf_server.cpp
blob8126218417cd8f936932019045006846ab5242ee
1 // NeL - MMORPG Framework <http://dev.ryzom.com/projects/nel/>
2 // Copyright (C) 2010 Winch Gate Property Limited
3 //
4 // This source file has been modified by the following contributors:
5 // Copyright (C) 2014-2016 Jan BOON (Kaetemi) <jan.boon@kaetemi.be>
6 //
7 // This program is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Affero General Public License as
9 // published by the Free Software Foundation, either version 3 of the
10 // License, or (at your option) any later version.
12 // This program is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU Affero General Public License for more details.
17 // You should have received a copy of the GNU Affero General Public License
18 // along with this program. If not, see <http://www.gnu.org/licenses/>.
20 #include "stdnet.h"
22 #include "nel/misc/hierarchical_timer.h"
24 #include "nel/net/buf_server.h"
25 #include "nel/net/net_log.h"
27 #ifdef NL_OS_WINDOWS
28 # ifndef NL_COMP_MINGW
29 # define NOMINMAX
30 # endif
31 # include <windows.h>
32 #elif defined NL_OS_UNIX
33 # include <unistd.h>
34 # include <cerrno>
35 # include <sys/types.h>
36 # include <sys/time.h>
37 #endif
40 * On Linux, the default limit of descriptors is usually 1024, you can increase it with ulimit
43 using namespace NLMISC;
44 using namespace std;
46 namespace NLNET {
48 uint32 NbServerListenTask = 0;
49 uint32 NbServerReceiveTask = 0;
51 /***************************************************************************************************
52 * User main thread (initialization)
53 **************************************************************************************************/
56 * Constructor
58 CBufServer::CBufServer( TThreadStategy strategy,
59 uint16 max_threads, uint16 max_sockets_per_thread, bool nodelay, bool replaymode, bool initPipeForDataAvailable ) :
60 #ifdef NL_OS_UNIX
61 CBufNetBase( initPipeForDataAvailable ),
62 #else
63 CBufNetBase(),
64 #endif
65 _ThreadStrategy( strategy ),
66 _MaxThreads( max_threads ),
67 _MaxSocketsPerThread( max_sockets_per_thread ),
68 _ListenTask( NULL ),
69 _ListenThread( NULL ),
70 _ThreadPool("CBufServer::_ThreadPool"),
71 _ConnectionCallback( NULL ),
72 _ConnectionCbArg( NULL ),
73 _BytesPushedOut( 0 ),
74 _BytesPoppedIn( 0 ),
75 _PrevBytesPoppedIn( 0 ),
76 _PrevBytesPushedOut( 0 ),
77 _NbConnections (0),
78 _NoDelay( nodelay ),
79 _ReplayMode( replaymode )
81 nlnettrace( "CBufServer::CBufServer" );
82 if ( ! _ReplayMode )
84 _ListenTask = new CListenTask( this );
85 _ListenThread = IThread::create( _ListenTask, 1024*4*4 );
87 /*{
88 CSynchronized<uint32>::CAccessor syncbpi ( &_BytesPushedIn );
89 syncbpi.value() = 0;
90 }*/
95 * Listens on the specified port
97 void CBufServer::init( uint16 port )
99 nlnettrace( "CBufServer::init" );
100 if ( ! _ReplayMode )
102 _ListenTask->init( port, maxExpectedBlockSize() );
103 _ListenThread->start();
105 else
107 LNETL1_DEBUG( "LNETL1: Binding listen socket to any address, port %hu", port );
113 * Begins to listen on the specified port (call before running thread)
115 void CListenTask::init( uint16 port, sint32 maxExpectedBlockSize )
117 nlnettrace( "CListenTask::init" );
118 _ListenSock.init( port );
119 _MaxExpectedBlockSize = maxExpectedBlockSize;
123 /***************************************************************************************************
124 * User main thread (running)
125 **************************************************************************************************/
129 * Constructor
131 CServerTask::CServerTask() : NbLoop (0), _ExitRequired(false)
133 #ifdef NL_OS_UNIX
134 if (pipe( _WakeUpPipeHandle ) == -1)
136 nlwarning("LNETL1: pipe() failed: code=%d '%s'", errno, strerror(errno));
138 #endif
143 #ifdef NL_OS_UNIX
145 * Wake the thread up, when blocked in select (Unix only)
147 void CServerTask::wakeUp()
149 uint8 b;
150 if ( write( _WakeUpPipeHandle[PipeWrite], &b, 1 ) == -1 )
152 LNETL1_DEBUG( "LNETL1: In CServerTask::wakeUp(): write() failed" );
155 #endif
159 * Destructor
161 CServerTask::~CServerTask()
163 #ifdef NL_OS_UNIX
164 close( _WakeUpPipeHandle[PipeRead] );
165 close( _WakeUpPipeHandle[PipeWrite] );
166 #endif
171 * Destructor
173 CBufServer::~CBufServer()
175 nlnettrace( "CBufServer::~CBufServer" );
177 // Clean listen thread exit
178 if ( ! _ReplayMode )
180 ((CListenTask*)(_ListenThread->getRunnable()))->requireExit();
181 ((CListenTask*)(_ListenThread->getRunnable()))->close();
182 #ifdef NL_OS_UNIX
183 _ListenTask->wakeUp();
184 #endif
185 _ListenThread->wait();
186 delete _ListenThread;
187 delete _ListenTask;
189 // Clean receive thread exits
190 CThreadPool::iterator ipt;
192 LNETL1_DEBUG( "LNETL1: Waiting for end of threads..." );
193 CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
194 for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
196 // Tell the threads to exit and wake them up
197 CServerReceiveTask *task = receiveTask(ipt);
198 nlnettrace( "Requiring exit" );
199 task->requireExit();
201 // Wake the threads up
202 #ifdef NL_OS_UNIX
203 task->wakeUp();
204 #else
205 CConnections::iterator ipb;
206 nlnettrace( "Disconnecting sockets (Win32)" );
208 CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
209 for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
211 (*ipb)->Sock->disconnect();
214 #endif
218 nlnettrace( "Waiting" );
219 for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
221 // Wait until the threads have exited
222 (*ipt)->wait();
225 LNETL1_DEBUG( "LNETL1: Deleting sockets, tasks and threads..." );
226 for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
228 // Delete the socket objects
229 CServerReceiveTask *task = receiveTask(ipt);
230 CConnections::iterator ipb;
232 CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
233 for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
235 delete (*ipb); // closes and deletes the socket
239 // Delete the task objects
240 delete task;
242 // Delete the thread objects
243 delete (*ipt);
248 nlnettrace( "Exiting CBufServer::~CBufServer" );
253 * Disconnect the specified host
254 * Set hostid to NULL to disconnect all connections.
255 * If hostid is not null and the socket is not connected, the method does nothing.
256 * If quick is true, any pending data will not be sent before disconnecting.
258 void CBufServer::disconnect( TSockId hostid, bool quick )
260 nlnettrace( "CBufServer::disconnect" );
261 if ( hostid != InvalidSockId )
263 if (_ConnectedClients.find(hostid) == _ConnectedClients.end())
265 // this host is not connected
266 return;
269 // Disconnect only if physically connected
270 if ( hostid->Sock->connected() )
272 if ( ! quick )
274 hostid->flush();
276 hostid->Sock->disconnect(); // the connection will be removed by the next call of update()
279 else
281 // Disconnect all
282 CThreadPool::iterator ipt;
284 CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
285 for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
287 CServerReceiveTask *task = receiveTask(ipt);
288 CConnections::iterator ipb;
290 CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
291 for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
293 if ( (*ipb)->Sock->connected() )
295 if ( ! quick )
297 (*ipb)->flush();
299 (*ipb)->Sock->disconnect();
310 * Send a message to the specified host
312 void CBufServer::send( const CMemStream& buffer, TSockId hostid )
314 nlnettrace( "CBufServer::send" );
315 nlassert( buffer.length() > 0 );
316 nlassertex( buffer.length() <= maxSentBlockSize(), ("length=%u max=%u", buffer.length(), maxSentBlockSize()) );
318 // slow down the layer H_AUTO (CBufServer_send);
320 if ( hostid != InvalidSockId )
322 if (_ConnectedClients.find(hostid) == _ConnectedClients.end())
324 // this host is not connected
325 return;
328 pushBufferToHost( buffer, hostid );
330 else
332 // Push into all send queues
333 CThreadPool::iterator ipt;
335 CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
336 for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
338 CServerReceiveTask *task = receiveTask(ipt);
339 CConnections::iterator ipb;
341 CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
342 for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
344 // Send only if the socket is logically connected
345 if ( (*ipb)->connectedState() )
347 pushBufferToHost( buffer, *ipb );
358 * Checks if there are some data to receive
360 bool CBufServer::dataAvailable()
362 // slow down the layer H_AUTO (CBufServer_dataAvailable);
364 /* If no data available, enter the 'while' loop and return false (1 volatile test)
365 * If there are user data available, enter the 'while' and return true immediately (1 volatile test + 1 short locking)
366 * If there is a connection/disconnection event (rare), call the callback and loop
368 while ( dataAvailableFlag() )
370 // Because _DataAvailable is true, the receive queue is not empty at this point
371 vector<uint8> buffer;
372 uint8 val;
374 CFifoAccessor recvfifo( &receiveQueue() );
375 val = recvfifo.value().frontLast();
376 if ( val != CBufNetBase::User )
378 recvfifo.value().front( buffer );
382 /*sint32 mbsize = recvfifo.value().size() / 1048576;
383 if ( mbsize > 0 )
385 nlwarning( "The receive queue size exceeds %d MB", mbsize );
388 /*vector<uint8> buffer;
389 recvfifo.value().front( buffer );*/
391 #ifdef NL_OS_UNIX
392 uint8 b;
393 if ( read( _DataAvailablePipeHandle[PipeRead], &b, 1 ) == -1 )
394 nlwarning( "LNETL1: Read pipe failed in dataAvailable" );
395 //nldebug( "Pipe: 1 byte read (server %p)", this );
396 #endif
398 // Test if it the next block is a system event
399 //switch ( buffer[buffer.size()-1] )
400 switch ( val )
403 // Normal message available
404 case CBufNetBase::User:
406 return true; // return immediately, do not extract the message
409 // Process disconnection event
410 case CBufNetBase::Disconnection:
412 TSockId sockid = *((TSockId*)(&*buffer.begin()));
413 LNETL1_DEBUG( "LNETL1: Disconnection event for %p %s", sockid, sockid->asString().c_str());
415 sockid->setConnectedState( false );
417 // Call callback if needed
418 if ( disconnectionCallback() != NULL )
420 disconnectionCallback()( sockid, argOfDisconnectionCallback() );
423 // remove from the list of valid client
424 nlverify(_ConnectedClients.erase(sockid) == 1);
426 // Add socket object into the synchronized remove list
427 LNETL1_DEBUG( "LNETL1: Adding the connection to the remove list" );
428 nlassert( ((CServerBufSock*)sockid)->ownerTask() != NULL );
429 ((CServerBufSock*)sockid)->ownerTask()->addToRemoveSet( sockid );
430 break;
432 // Process connection event
433 case CBufNetBase::Connection:
435 TSockId sockid = *((TSockId*)(&*buffer.begin()));
436 LNETL1_DEBUG( "LNETL1: Connection event for %p %s", sockid, sockid->asString().c_str());
438 // add this socket in the list of client
439 nlverify(_ConnectedClients.insert(sockid).second);
441 sockid->setConnectedState( true );
443 // Call callback if needed
444 if ( connectionCallback() != NULL )
446 connectionCallback()( sockid, argOfConnectionCallback() );
448 break;
450 default: // should not occur
451 LNETL1_INFO( "LNETL1: Invalid block type: %hu (should be = to %hu", (uint16)(buffer[buffer.size()-1]), (uint16)(val) );
452 LNETL1_INFO( "LNETL1: Buffer (%d B): [%s]", buffer.size(), stringFromVector(buffer).c_str() );
453 LNETL1_INFO( "LNETL1: Receive queue:" );
455 CFifoAccessor recvfifo( &receiveQueue() );
456 recvfifo.value().display();
458 nlerror( "LNETL1: Invalid system event type in server receive queue" );
462 // Extract system event
464 CFifoAccessor recvfifo( &receiveQueue() );
465 recvfifo.value().pop();
466 setDataAvailableFlag( ! recvfifo.value().empty() );
469 // _DataAvailable is false here
470 return false;
475 #ifdef NL_OS_UNIX
476 /* Wait until the receive queue contains something to read (implemented with a select()).
477 * This is where the connection/disconnection callbacks can be called.
478 * \param usecMax Max time to wait in microsecond (up to 1 sec)
480 void CBufServer::sleepUntilDataAvailable( uint usecMax )
482 // Prevent looping infinitely if the system time was changed
483 if ( usecMax > 999999 ) // limit not told in Linux man but here: http://docs.hp.com/en/B9106-90009/select.2.html
484 usecMax = 999999;
486 fd_set readers;
487 timeval tv;
490 FD_ZERO( &readers );
491 FD_SET( _DataAvailablePipeHandle[PipeRead], &readers );
492 tv.tv_sec = 0;
493 tv.tv_usec = usecMax;
494 int res = ::select( _DataAvailablePipeHandle[PipeRead]+1, &readers, NULL, NULL, &tv );
495 if ( res == -1 )
496 nlerror( "LNETL1: Select failed in sleepUntilDataAvailable (code %u)", CSock::getLastError() );
498 while ( ! dataAvailable() ); // will loop if only a connection/disconnection event was read
500 #endif
504 * Receives next block of data in the specified. The length and hostid are output arguments.
505 * Precond: dataAvailable() has returned true, phostid not null
507 void CBufServer::receive( CMemStream& buffer, TSockId* phostid )
509 nlnettrace( "CBufServer::receive" );
510 //nlassert( dataAvailable() );
511 nlassert( phostid != NULL );
514 CFifoAccessor recvfifo( &receiveQueue() );
515 nlassert( ! recvfifo.value().empty() );
516 recvfifo.value().front( buffer );
517 recvfifo.value().pop();
518 setDataAvailableFlag( ! recvfifo.value().empty() );
521 // Extract hostid (and event type)
522 *phostid = *((TSockId*)&(buffer.buffer()[buffer.size()-sizeof(TSockId)-1]));
523 nlassert( buffer.buffer()[buffer.size()-1] == CBufNetBase::User );
525 buffer.resize( buffer.size()-sizeof(TSockId)-1 );
527 // TODO OPTIM remove the nldebug for speed
528 //commented for optimisation LNETL1_DEBUG( "LNETL1: Read buffer (%d+%d B) from %s", buffer.size(), sizeof(TSockId)+1, /*stringFromVector(buffer).c_str(), */(*phostid)->asString().c_str() );
530 // Statistics
531 _BytesPoppedIn += buffer.size() + sizeof(TBlockSize);
536 * Update the network (call this method evenly)
538 void CBufServer::update()
540 //nlnettrace( "CBufServer::update-BEGIN" );
542 _NbConnections = 0;
544 // For each thread
545 CThreadPool::iterator ipt;
547 //nldebug( "UPD: Acquiring the Thread Pool" );
548 CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
549 //nldebug( "UPD: Acquired." );
550 for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
552 // For each thread of the pool
553 CServerReceiveTask *task = receiveTask(ipt);
554 CConnections::iterator ipb;
556 CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
557 for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
559 // For each socket of the thread, update sending
560 if ( ! ((*ipb)->Sock->connected() && (*ipb)->update()) )
562 // Update did not work or the socket is not connected anymore
563 LNETL1_DEBUG( "LNETL1: Socket %s is disconnected", (*ipb)->asString().c_str() );
564 // Disconnection event if disconnected (known either from flush (in update) or when receiving data)
565 (*ipb)->advertiseDisconnection( this, *ipb );
567 /*if ( (*ipb)->advertiseDisconnection( this, *ipb ) )
569 // Now the connection removal is in dataAvailable()
570 // POLL6
573 else
575 _NbConnections++;
582 //nlnettrace( "CBufServer::update-END" );
585 uint32 CBufServer::getSendQueueSize( TSockId destid )
587 if ( destid != InvalidSockId )
589 if (_ConnectedClients.find(destid) == _ConnectedClients.end())
591 // this host is not connected
592 return 0;
595 return destid->SendFifo.size();
597 else
599 // add all client buffers
601 uint32 total = 0;
603 // For each thread
604 CThreadPool::iterator ipt;
606 CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
607 for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
609 // For each thread of the pool
610 CServerReceiveTask *task = receiveTask(ipt);
611 CConnections::iterator ipb;
613 CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
614 for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
616 // For each socket of the thread, update sending
617 total = (*ipb)->SendFifo.size ();
622 return total;
626 void CBufServer::displayThreadStat (NLMISC::CLog *log)
628 // For each thread
629 CThreadPool::iterator ipt;
631 CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
632 for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
634 // For each thread of the pool
635 CServerReceiveTask *task = receiveTask(ipt);
636 // For each socket of the thread, update sending
637 log->displayNL ("server receive thread %p nbloop %d", task, task->NbLoop);
641 log->displayNL ("server listen thread %p nbloop %d", _ListenTask, _ListenTask->NbLoop);
644 void CBufServer::setTimeFlushTrigger( TSockId destid, sint32 ms )
646 nlassert( destid != InvalidSockId );
647 if (_ConnectedClients.find(destid) != _ConnectedClients.end())
648 destid->setTimeFlushTrigger( ms );
651 void CBufServer::setSizeFlushTrigger( TSockId destid, sint32 size )
653 nlassert( destid != InvalidSockId );
654 if (_ConnectedClients.find(destid) != _ConnectedClients.end())
655 destid->setSizeFlushTrigger( size );
658 bool CBufServer::flush( TSockId destid, uint *nbBytesRemaining)
660 nlassert( destid != InvalidSockId );
661 if (_ConnectedClients.find(destid) != _ConnectedClients.end())
662 return destid->flush( nbBytesRemaining );
663 else
664 return true;
666 const CInetAddress& CBufServer::hostAddress( TSockId hostid )
668 nlassert( hostid != InvalidSockId );
669 if (_ConnectedClients.find(hostid) != _ConnectedClients.end())
670 return hostid->Sock->remoteAddr();
672 static CInetAddress nullAddr;
673 return nullAddr;
676 void CBufServer::displaySendQueueStat (NLMISC::CLog *log, TSockId destid)
678 if ( destid != InvalidSockId )
680 if (_ConnectedClients.find(destid) == _ConnectedClients.end())
682 // this host is not connected
683 return;
686 destid->SendFifo.displayStats(log);
688 else
690 // add all client buffers
692 // For each thread
693 CThreadPool::iterator ipt;
695 CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
696 for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
698 // For each thread of the pool
699 CServerReceiveTask *task = receiveTask(ipt);
700 CConnections::iterator ipb;
702 CSynchronized<CConnections>::CAccessor connectionssync( &task->_Connections );
703 for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
705 // For each socket of the thread, update sending
706 (*ipb)->SendFifo.displayStats(log);
716 * Returns the number of bytes received since the previous call to this method
718 uint64 CBufServer::newBytesReceived()
720 uint64 b = bytesReceived();
721 uint64 nbrecvd = b - _PrevBytesPoppedIn;
722 //nlinfo( "b: %" NL_I64 "u new: %" NL_I64 "u", b, nbrecvd );
723 _PrevBytesPoppedIn = b;
724 return nbrecvd;
728 * Returns the number of bytes sent since the previous call to this method
730 uint64 CBufServer::newBytesSent()
732 uint64 b = bytesSent();
733 uint64 nbsent = b - _PrevBytesPushedOut;
734 //nlinfo( "b: %" NL_I64 "u new: %" NL_I64 "u", b, nbsent );
735 _PrevBytesPushedOut = b;
736 return nbsent;
740 /***************************************************************************************************
741 * Listen thread
742 **************************************************************************************************/
746 * Code of listening thread
748 void CListenTask::run()
750 NbNetworkTask++;
751 NbServerListenTask++;
753 nlnettrace( "CListenTask::run" );
755 fd_set readers;
756 #ifdef NL_OS_UNIX
757 SOCKET descmax;
758 descmax = _ListenSock.descriptor()>_WakeUpPipeHandle[PipeRead]?_ListenSock.descriptor():_WakeUpPipeHandle[PipeRead];
759 #endif
761 // Accept connections
762 while ( ! exitRequired() )
766 LNETL1_DEBUG( "LNETL1: Waiting incoming connection..." );
767 // Get and setup the new socket
768 #ifdef NL_OS_UNIX
769 FD_ZERO( &readers );
770 FD_SET( _ListenSock.descriptor(), &readers );
771 FD_SET( _WakeUpPipeHandle[PipeRead], &readers );
772 int res = ::select( descmax+1, &readers, NULL, NULL, NULL ); /// Wait indefinitely
774 switch ( res )
776 //case 0 : continue; // time-out expired, no results
777 case -1 :
778 // we'll ignore message (Interrupted system call) caused by a CTRL-C
779 if (CSock::getLastError() == 4)
781 LNETL1_DEBUG ("LNETL1: Select failed (in listen thread): %s (code %u) but IGNORED", CSock::errorString( CSock::getLastError() ).c_str(), CSock::getLastError());
782 continue;
784 nlerror( "LNETL1: Select failed (in listen thread): %s (code %u)", CSock::errorString( CSock::getLastError() ).c_str(), CSock::getLastError() );
787 if ( FD_ISSET( _WakeUpPipeHandle[PipeRead], &readers ) )
789 uint8 b;
790 if ( read( _WakeUpPipeHandle[PipeRead], &b, 1 ) == -1 ) // we were woken-up by the wake-up pipe
792 LNETL1_DEBUG( "LNETL1: In CListenTask::run(): read() failed" );
794 LNETL1_DEBUG( "LNETL1: listen thread select woken-up" );
795 continue;
797 #elif defined (NL_OS_WINDOWS)
798 FD_ZERO( &readers );
799 FD_SET( _ListenSock.descriptor(), &readers );
800 int res = ::select( 1, &readers, NULL, NULL, NULL ); /// Wait indefinitely
802 if ( res == -1)
804 nlerror( "LNETL1: Select failed (in listen thread): %s (code %u)", CSock::errorString( CSock::getLastError() ).c_str(), CSock::getLastError() );
805 continue;
807 #endif
808 LNETL1_DEBUG( "LNETL1: Accepting an incoming connection..." );
809 CTcpSock *newSock = _ListenSock.accept();
810 if (newSock != NULL)
812 CServerBufSock *bufsock = new CServerBufSock( newSock );
813 LNETL1_DEBUG( "LNETL1: New connection : %s", bufsock->asString().c_str() );
814 bufsock->setNonBlocking();
815 bufsock->setMaxExpectedBlockSize( _MaxExpectedBlockSize );
816 if ( _Server->noDelay() )
818 bufsock->Sock->setNoDelay( true );
821 // Notify the new connection
822 bufsock->advertiseConnection( _Server );
824 // Dispatch the socket into the thread pool
825 _Server->dispatchNewSocket( bufsock );
828 NbLoop++;
830 catch (const ESocket &e)
832 LNETL1_INFO( "LNETL1: Exception in listen thread: %s", e.what() );
833 // It can occur when too many sockets are open (e.g. 885 connections)
837 nlnettrace( "Exiting CListenTask::run" );
838 NbServerListenTask--;
839 NbNetworkTask--;
842 /// Close listening socket
843 void CListenTask::close()
845 _ListenSock.close();
846 // _ListenSock.disconnect();
851 * Binds a new socket and send buffer to an existing or a new thread
852 * Note: this method is called in the listening thread.
854 void CBufServer::dispatchNewSocket( CServerBufSock *bufsock )
856 nlnettrace( "CBufServer::dispatchNewSocket" );
858 CSynchronized<CThreadPool>::CAccessor poolsync( &_ThreadPool );
859 if ( _ThreadStrategy == SpreadSockets )
861 // Find the thread with the smallest number of connections and check if all
862 // threads do not have the same number of connections
863 uint min = 0xFFFFFFFF;
864 uint max = 0;
865 CThreadPool::iterator ipt, iptmin, iptmax;
866 for ( iptmin=iptmax=ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
868 uint noc = receiveTask(ipt)->numberOfConnections();
869 if ( noc < min )
871 min = noc;
872 iptmin = ipt;
874 if ( noc > max )
876 max = noc;
877 iptmax = ipt;
881 // Check if we make the pool of threads grow (if we have not found vacant room
882 // and if it is allowed to)
883 if ( (poolsync.value().empty()) ||
884 ((min == max) && (poolsync.value().size() < _MaxThreads)) )
886 addNewThread( poolsync.value(), bufsock );
888 else
890 // Dispatch socket to an existing thread of the pool
891 CServerReceiveTask *task = receiveTask(iptmin);
892 bufsock->setOwnerTask( task );
893 task->addNewSocket( bufsock );
894 #ifdef NL_OS_UNIX
895 task->wakeUp();
896 #endif
898 if ( min >= (uint)_MaxSocketsPerThread )
900 nlwarning( "LNETL1: Exceeding the maximum number of sockets per thread" );
902 LNETL1_DEBUG( "LNETL1: New socket dispatched to thread %d", iptmin-poolsync.value().begin() );
906 else // _ThreadStrategy == FillThreads
908 CThreadPool::iterator ipt;
909 for ( ipt=poolsync.value().begin(); ipt!=poolsync.value().end(); ++ipt )
911 uint noc = receiveTask(ipt)->numberOfConnections();
912 if ( noc < _MaxSocketsPerThread )
914 break;
918 // Check if we have to make the thread pool grow (if we have not found vacant room)
919 if ( ipt == poolsync.value().end() )
921 if ( poolsync.value().size() == _MaxThreads )
923 nlwarning( "LNETL1: Exceeding the maximum number of threads" );
925 addNewThread( poolsync.value(), bufsock );
927 else
929 // Dispatch socket to an existing thread of the pool
930 CServerReceiveTask *task = receiveTask(ipt);
931 bufsock->setOwnerTask( task );
932 task->addNewSocket( bufsock );
933 #ifdef NL_OS_UNIX
934 task->wakeUp();
935 #endif
936 LNETL1_DEBUG( "LNETL1: New socket dispatched to thread %d", ipt-poolsync.value().begin() );
943 * Creates a new task and run a new thread for it
944 * Precond: bufsock not null
946 void CBufServer::addNewThread( CThreadPool& threadpool, CServerBufSock *bufsock )
948 nlnettrace( "CBufServer::addNewThread" );
949 nlassert( bufsock != NULL );
951 // Create new task and dispatch the socket to it
952 CServerReceiveTask *task = new CServerReceiveTask( this );
953 bufsock->setOwnerTask( task );
954 task->addNewSocket( bufsock );
956 // Add a new thread to the pool, with this task
957 IThread *thr = IThread::create( task, 1024*4*4 );
959 threadpool.push_back( thr );
960 thr->start();
961 LNETL1_DEBUG( "LNETL1: Added a new thread; pool size is %d", threadpool.size() );
962 LNETL1_DEBUG( "LNETL1: New socket dispatched to thread %d", threadpool.size()-1 );
967 /***************************************************************************************************
968 * Receive threads
969 **************************************************************************************************/
973 * Code of receiving threads for servers
975 void CServerReceiveTask::run()
977 NbNetworkTask++;
978 NbServerReceiveTask++;
979 nlnettrace( "CServerReceiveTask::run" );
981 SOCKET descmax;
982 fd_set readers;
984 #if defined NL_OS_UNIX
985 // POLL7
986 if (nice( 2 ) == -1) // is this really useful as long as select() sleeps?
988 nlwarning("LNETL1: nice() failed: code=%d '%s'", errno, strerror(errno));
990 #endif // NL_OS_UNIX
992 // Copy of _Connections
993 vector<TSockId> connections_copy;
995 while ( ! exitRequired() )
997 // 1. Remove closed connections
998 clearClosedConnections();
1000 // POLL8
1002 // 2-SELECT-VERSION : select() on the sockets handled in the present thread
1004 descmax = 0;
1005 FD_ZERO( &readers );
1006 bool skip;
1007 bool alldisconnected = true;
1008 CConnections::iterator ipb;
1010 // Lock _Connections
1011 CSynchronized<CConnections>::CAccessor connectionssync( &_Connections );
1013 // Prepare to avoid select if there is no connection
1014 skip = connectionssync.value().empty();
1016 // Fill the select array and copy _Connections
1017 connections_copy.clear();
1018 for ( ipb=connectionssync.value().begin(); ipb!=connectionssync.value().end(); ++ipb )
1020 if ( (*ipb)->Sock->connected() ) // exclude disconnected sockets that are not deleted
1022 alldisconnected = false;
1023 // Copy _Connections element
1024 connections_copy.push_back( *ipb );
1026 // Add socket descriptor to the select array
1027 FD_SET( (*ipb)->Sock->descriptor(), &readers );
1029 // Calculate descmax for select
1030 if ( (*ipb)->Sock->descriptor() > descmax )
1032 descmax = (*ipb)->Sock->descriptor();
1037 #ifdef NL_OS_UNIX
1038 // Add the wake-up pipe into the select array
1039 FD_SET( _WakeUpPipeHandle[PipeRead], &readers );
1040 if ( _WakeUpPipeHandle[PipeRead]>descmax )
1042 descmax = _WakeUpPipeHandle[PipeRead];
1044 #endif
1046 // Unlock _Connections, use connections_copy instead
1049 #ifndef NL_OS_UNIX
1050 // Avoid select if there is no connection (Windows only)
1051 if ( skip || alldisconnected )
1053 nlSleep( 1 ); // nice
1054 continue;
1056 #endif
1058 #ifdef NL_OS_WINDOWS
1059 TIMEVAL tv;
1060 tv.tv_sec = 0; // short time because the newly added connections can't be added to the select fd_set
1061 tv.tv_usec = 10000;
1063 // Call select
1064 int res = ::select( descmax+1, &readers, NULL, NULL, &tv );
1066 #elif defined NL_OS_UNIX
1068 // Call select
1069 int res = ::select( descmax+1, &readers, NULL, NULL, NULL );
1071 #endif // NL_OS_WINDOWS
1074 // POLL9
1076 // 3. Test the result
1077 switch ( res )
1079 #ifdef NL_OS_WINDOWS
1080 case 0 : continue; // time-out expired, no results
1081 #endif
1082 case -1 :
1083 // we'll ignore message (Interrupted system call) caused by a CTRL-C
1084 /*if (CSock::getLastError() == 4)
1086 LNETL1_DEBUG ("LNETL1: Select failed (in receive thread): %s (code %u) but IGNORED", CSock::errorString( CSock::getLastError() ).c_str(), CSock::getLastError());
1087 continue;
1089 //nlerror( "LNETL1: Select failed (in receive thread): %s (code %u)", CSock::errorString( CSock::getLastError() ).c_str(), CSock::getLastError() );
1090 LNETL1_DEBUG( "LNETL1: Select failed (in receive thread): %s (code %u)", CSock::errorString( CSock::getLastError() ).c_str(), CSock::getLastError() );
1091 goto end;
1094 // 4. Get results
1096 vector<TSockId>::iterator ic;
1097 for ( ic=connections_copy.begin(); ic!=connections_copy.end(); ++ic )
1099 if ( FD_ISSET( (*ic)->Sock->descriptor(), &readers ) != 0 )
1101 CServerBufSock *serverbufsock = static_cast<CServerBufSock*>(static_cast<CBufSock*>(*ic));
1104 // 4. Receive data
1105 if ( serverbufsock->receivePart( sizeof(TSockId) + 1 ) ) // +1 for the event type
1107 serverbufsock->fillSockIdAndEventType( *ic );
1109 // Push message into receive queue
1110 //uint32 bufsize;
1111 //sint32 mbsize;
1113 _Server->pushMessageIntoReceiveQueue( serverbufsock->receivedBuffer() );
1115 //recvfifo.value().display();
1116 //bufsize = serverbufsock->receivedBuffer().size();
1117 //mbsize = recvfifo.value().size() / 1048576;
1118 //nldebug( "RCV: Released." );
1119 /*if ( mbsize > 1 )
1121 nlwarning( "The receive queue size exceeds %d MB", mbsize );
1124 // Statistics
1126 CSynchronized<uint32>::CAccessor syncbpi ( &_Server->syncBytesPushedIn() );
1127 syncbpi.value() += bufsize;
1132 // catch (const ESocketConnectionClosed&)
1133 // {
1134 // LNETL1_DEBUG( "LNETL1: Connection %s closed", serverbufsock->asString().c_str() );
1135 // // The socket went to _Connected=false when throwing the exception
1136 // }
1137 catch (const ESocket&)
1139 LNETL1_DEBUG( "LNETL1: Connection %s broken", serverbufsock->asString().c_str() );
1140 (*ic)->Sock->disconnect();
1143 #ifdef NL_OS_UNIX
1144 skip = true; // don't check _WakeUpPipeHandle (yes, check it to read any written byte)
1145 #endif
1151 #ifdef NL_OS_UNIX
1152 // Test wake-up pipe
1153 if ( (!skip) && (FD_ISSET( _WakeUpPipeHandle[PipeRead], &readers )) )
1155 uint8 b;
1156 if ( read( _WakeUpPipeHandle[PipeRead], &b, 1 ) == -1 ) // we were woken-up by the wake-up pipe
1158 LNETL1_DEBUG( "LNETL1: In CServerReceiveTask::run(): read() failed" );
1160 LNETL1_DEBUG( "LNETL1: Receive thread select woken-up" );
1162 #endif
1164 NbLoop++;
1166 end:
1167 nlnettrace( "Exiting CServerReceiveTask::run" );
1168 NbServerReceiveTask--;
1169 NbNetworkTask--;
1174 * Delete all connections referenced in the remove list (double-mutexed)
1177 void CServerReceiveTask::clearClosedConnections()
1179 CConnections::iterator ic;
1181 NLMISC::CSynchronized<CConnections>::CAccessor removesetsync( &_RemoveSet );
1183 if ( ! removesetsync.value().empty() )
1185 // Delete closed connections
1186 NLMISC::CSynchronized<CConnections>::CAccessor connectionssync( &_Connections );
1187 for ( ic=removesetsync.value().begin(); ic!=removesetsync.value().end(); ++ic )
1189 LNETL1_DEBUG( "LNETL1: Removing a connection" );
1191 TSockId sid = (*ic);
1193 // Remove from the connection list
1194 connectionssync.value().erase( *ic );
1196 // Delete the socket object
1197 delete sid;
1199 // Clear remove list
1200 removesetsync.value().clear();
1206 NLMISC_CATEGORISED_VARIABLE(nel, uint32, NbServerListenTask, "Number of server listen thread");
1207 NLMISC_CATEGORISED_VARIABLE(nel, uint32, NbServerReceiveTask, "Number of server receive thread");
1209 } // NLNET