1 // NeL - MMORPG Framework <http://dev.ryzom.com/projects/nel/>
2 // Copyright (C) 2010 Winch Gate Property Limited
4 // This source file has been modified by the following contributors:
5 // Copyright (C) 2014-2016 Jan BOON (Kaetemi) <jan.boon@kaetemi.be>
7 // This program is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Affero General Public License as
9 // published by the Free Software Foundation, either version 3 of the
10 // License, or (at your option) any later version.
12 // This program is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU Affero General Public License for more details.
17 // You should have received a copy of the GNU Affero General Public License
18 // along with this program. If not, see <http://www.gnu.org/licenses/>.
22 #include "nel/misc/hierarchical_timer.h"
24 #include "nel/net/buf_server.h"
25 #include "nel/net/net_log.h"
28 # ifndef NL_COMP_MINGW
32 #elif defined NL_OS_UNIX
35 # include <sys/types.h>
36 # include <sys/time.h>
40 * On Linux, the default limit of descriptors is usually 1024, you can increase it with ulimit
43 using namespace NLMISC
;
48 uint32 NbServerListenTask
= 0;
49 uint32 NbServerReceiveTask
= 0;
51 /***************************************************************************************************
52 * User main thread (initialization)
53 **************************************************************************************************/
58 CBufServer::CBufServer( TThreadStategy strategy
,
59 uint16 max_threads
, uint16 max_sockets_per_thread
, bool nodelay
, bool replaymode
, bool initPipeForDataAvailable
) :
61 CBufNetBase( initPipeForDataAvailable
),
65 _ThreadStrategy( strategy
),
66 _MaxThreads( max_threads
),
67 _MaxSocketsPerThread( max_sockets_per_thread
),
69 _ListenThread( NULL
),
70 _ThreadPool("CBufServer::_ThreadPool"),
71 _ConnectionCallback( NULL
),
72 _ConnectionCbArg( NULL
),
75 _PrevBytesPoppedIn( 0 ),
76 _PrevBytesPushedOut( 0 ),
79 _ReplayMode( replaymode
)
81 nlnettrace( "CBufServer::CBufServer" );
84 _ListenTask
= new CListenTask( this );
85 _ListenThread
= IThread::create( _ListenTask
, 1024*4*4 );
88 CSynchronized<uint32>::CAccessor syncbpi ( &_BytesPushedIn );
95 * Listens on the specified port
97 void CBufServer::init( uint16 port
)
99 nlnettrace( "CBufServer::init" );
102 _ListenTask
->init( port
, maxExpectedBlockSize() );
103 _ListenThread
->start();
107 LNETL1_DEBUG( "LNETL1: Binding listen socket to any address, port %hu", port
);
113 * Begins to listen on the specified port (call before running thread)
115 void CListenTask::init( uint16 port
, sint32 maxExpectedBlockSize
)
117 nlnettrace( "CListenTask::init" );
118 _ListenSock
.init( port
);
119 _MaxExpectedBlockSize
= maxExpectedBlockSize
;
123 /***************************************************************************************************
124 * User main thread (running)
125 **************************************************************************************************/
131 CServerTask::CServerTask() : NbLoop (0), _ExitRequired(false)
134 if (pipe( _WakeUpPipeHandle
) == -1)
136 nlwarning("LNETL1: pipe() failed: code=%d '%s'", errno
, strerror(errno
));
145 * Wake the thread up, when blocked in select (Unix only)
147 void CServerTask::wakeUp()
150 if ( write( _WakeUpPipeHandle
[PipeWrite
], &b
, 1 ) == -1 )
152 LNETL1_DEBUG( "LNETL1: In CServerTask::wakeUp(): write() failed" );
161 CServerTask::~CServerTask()
164 close( _WakeUpPipeHandle
[PipeRead
] );
165 close( _WakeUpPipeHandle
[PipeWrite
] );
173 CBufServer::~CBufServer()
175 nlnettrace( "CBufServer::~CBufServer" );
177 // Clean listen thread exit
180 ((CListenTask
*)(_ListenThread
->getRunnable()))->requireExit();
181 ((CListenTask
*)(_ListenThread
->getRunnable()))->close();
183 _ListenTask
->wakeUp();
185 _ListenThread
->wait();
186 delete _ListenThread
;
189 // Clean receive thread exits
190 CThreadPool::iterator ipt
;
192 LNETL1_DEBUG( "LNETL1: Waiting for end of threads..." );
193 CSynchronized
<CThreadPool
>::CAccessor
poolsync( &_ThreadPool
);
194 for ( ipt
=poolsync
.value().begin(); ipt
!=poolsync
.value().end(); ++ipt
)
196 // Tell the threads to exit and wake them up
197 CServerReceiveTask
*task
= receiveTask(ipt
);
198 nlnettrace( "Requiring exit" );
201 // Wake the threads up
205 CConnections::iterator ipb
;
206 nlnettrace( "Disconnecting sockets (Win32)" );
208 CSynchronized
<CConnections
>::CAccessor
connectionssync( &task
->_Connections
);
209 for ( ipb
=connectionssync
.value().begin(); ipb
!=connectionssync
.value().end(); ++ipb
)
211 (*ipb
)->Sock
->disconnect();
218 nlnettrace( "Waiting" );
219 for ( ipt
=poolsync
.value().begin(); ipt
!=poolsync
.value().end(); ++ipt
)
221 // Wait until the threads have exited
225 LNETL1_DEBUG( "LNETL1: Deleting sockets, tasks and threads..." );
226 for ( ipt
=poolsync
.value().begin(); ipt
!=poolsync
.value().end(); ++ipt
)
228 // Delete the socket objects
229 CServerReceiveTask
*task
= receiveTask(ipt
);
230 CConnections::iterator ipb
;
232 CSynchronized
<CConnections
>::CAccessor
connectionssync( &task
->_Connections
);
233 for ( ipb
=connectionssync
.value().begin(); ipb
!=connectionssync
.value().end(); ++ipb
)
235 delete (*ipb
); // closes and deletes the socket
239 // Delete the task objects
242 // Delete the thread objects
248 nlnettrace( "Exiting CBufServer::~CBufServer" );
253 * Disconnect the specified host
254 * Set hostid to NULL to disconnect all connections.
255 * If hostid is not null and the socket is not connected, the method does nothing.
256 * If quick is true, any pending data will not be sent before disconnecting.
258 void CBufServer::disconnect( TSockId hostid
, bool quick
)
260 nlnettrace( "CBufServer::disconnect" );
261 if ( hostid
!= InvalidSockId
)
263 if (_ConnectedClients
.find(hostid
) == _ConnectedClients
.end())
265 // this host is not connected
269 // Disconnect only if physically connected
270 if ( hostid
->Sock
->connected() )
276 hostid
->Sock
->disconnect(); // the connection will be removed by the next call of update()
282 CThreadPool::iterator ipt
;
284 CSynchronized
<CThreadPool
>::CAccessor
poolsync( &_ThreadPool
);
285 for ( ipt
=poolsync
.value().begin(); ipt
!=poolsync
.value().end(); ++ipt
)
287 CServerReceiveTask
*task
= receiveTask(ipt
);
288 CConnections::iterator ipb
;
290 CSynchronized
<CConnections
>::CAccessor
connectionssync( &task
->_Connections
);
291 for ( ipb
=connectionssync
.value().begin(); ipb
!=connectionssync
.value().end(); ++ipb
)
293 if ( (*ipb
)->Sock
->connected() )
299 (*ipb
)->Sock
->disconnect();
310 * Send a message to the specified host
312 void CBufServer::send( const CMemStream
& buffer
, TSockId hostid
)
314 nlnettrace( "CBufServer::send" );
315 nlassert( buffer
.length() > 0 );
316 nlassertex( buffer
.length() <= maxSentBlockSize(), ("length=%u max=%u", buffer
.length(), maxSentBlockSize()) );
318 // slow down the layer H_AUTO (CBufServer_send);
320 if ( hostid
!= InvalidSockId
)
322 if (_ConnectedClients
.find(hostid
) == _ConnectedClients
.end())
324 // this host is not connected
328 pushBufferToHost( buffer
, hostid
);
332 // Push into all send queues
333 CThreadPool::iterator ipt
;
335 CSynchronized
<CThreadPool
>::CAccessor
poolsync( &_ThreadPool
);
336 for ( ipt
=poolsync
.value().begin(); ipt
!=poolsync
.value().end(); ++ipt
)
338 CServerReceiveTask
*task
= receiveTask(ipt
);
339 CConnections::iterator ipb
;
341 CSynchronized
<CConnections
>::CAccessor
connectionssync( &task
->_Connections
);
342 for ( ipb
=connectionssync
.value().begin(); ipb
!=connectionssync
.value().end(); ++ipb
)
344 // Send only if the socket is logically connected
345 if ( (*ipb
)->connectedState() )
347 pushBufferToHost( buffer
, *ipb
);
358 * Checks if there are some data to receive
360 bool CBufServer::dataAvailable()
362 // slow down the layer H_AUTO (CBufServer_dataAvailable);
364 /* If no data available, enter the 'while' loop and return false (1 volatile test)
365 * If there are user data available, enter the 'while' and return true immediately (1 volatile test + 1 short locking)
366 * If there is a connection/disconnection event (rare), call the callback and loop
368 while ( dataAvailableFlag() )
370 // Because _DataAvailable is true, the receive queue is not empty at this point
371 vector
<uint8
> buffer
;
374 CFifoAccessor
recvfifo( &receiveQueue() );
375 val
= recvfifo
.value().frontLast();
376 if ( val
!= CBufNetBase::User
)
378 recvfifo
.value().front( buffer
);
382 /*sint32 mbsize = recvfifo.value().size() / 1048576;
385 nlwarning( "The receive queue size exceeds %d MB", mbsize );
388 /*vector<uint8> buffer;
389 recvfifo.value().front( buffer );*/
393 if ( read( _DataAvailablePipeHandle
[PipeRead
], &b
, 1 ) == -1 )
394 nlwarning( "LNETL1: Read pipe failed in dataAvailable" );
395 //nldebug( "Pipe: 1 byte read (server %p)", this );
398 // Test if it the next block is a system event
399 //switch ( buffer[buffer.size()-1] )
403 // Normal message available
404 case CBufNetBase::User
:
406 return true; // return immediately, do not extract the message
409 // Process disconnection event
410 case CBufNetBase::Disconnection
:
412 TSockId sockid
= *((TSockId
*)(&*buffer
.begin()));
413 LNETL1_DEBUG( "LNETL1: Disconnection event for %p %s", sockid
, sockid
->asString().c_str());
415 sockid
->setConnectedState( false );
417 // Call callback if needed
418 if ( disconnectionCallback() != NULL
)
420 disconnectionCallback()( sockid
, argOfDisconnectionCallback() );
423 // remove from the list of valid client
424 nlverify(_ConnectedClients
.erase(sockid
) == 1);
426 // Add socket object into the synchronized remove list
427 LNETL1_DEBUG( "LNETL1: Adding the connection to the remove list" );
428 nlassert( ((CServerBufSock
*)sockid
)->ownerTask() != NULL
);
429 ((CServerBufSock
*)sockid
)->ownerTask()->addToRemoveSet( sockid
);
432 // Process connection event
433 case CBufNetBase::Connection
:
435 TSockId sockid
= *((TSockId
*)(&*buffer
.begin()));
436 LNETL1_DEBUG( "LNETL1: Connection event for %p %s", sockid
, sockid
->asString().c_str());
438 // add this socket in the list of client
439 nlverify(_ConnectedClients
.insert(sockid
).second
);
441 sockid
->setConnectedState( true );
443 // Call callback if needed
444 if ( connectionCallback() != NULL
)
446 connectionCallback()( sockid
, argOfConnectionCallback() );
450 default: // should not occur
451 LNETL1_INFO( "LNETL1: Invalid block type: %hu (should be = to %hu", (uint16
)(buffer
[buffer
.size()-1]), (uint16
)(val
) );
452 LNETL1_INFO( "LNETL1: Buffer (%d B): [%s]", buffer
.size(), stringFromVector(buffer
).c_str() );
453 LNETL1_INFO( "LNETL1: Receive queue:" );
455 CFifoAccessor
recvfifo( &receiveQueue() );
456 recvfifo
.value().display();
458 nlerror( "LNETL1: Invalid system event type in server receive queue" );
462 // Extract system event
464 CFifoAccessor
recvfifo( &receiveQueue() );
465 recvfifo
.value().pop();
466 setDataAvailableFlag( ! recvfifo
.value().empty() );
469 // _DataAvailable is false here
476 /* Wait until the receive queue contains something to read (implemented with a select()).
477 * This is where the connection/disconnection callbacks can be called.
478 * \param usecMax Max time to wait in microsecond (up to 1 sec)
480 void CBufServer::sleepUntilDataAvailable( uint usecMax
)
482 // Prevent looping infinitely if the system time was changed
483 if ( usecMax
> 999999 ) // limit not told in Linux man but here: http://docs.hp.com/en/B9106-90009/select.2.html
491 FD_SET( _DataAvailablePipeHandle
[PipeRead
], &readers
);
493 tv
.tv_usec
= usecMax
;
494 int res
= ::select( _DataAvailablePipeHandle
[PipeRead
]+1, &readers
, NULL
, NULL
, &tv
);
496 nlerror( "LNETL1: Select failed in sleepUntilDataAvailable (code %u)", CSock::getLastError() );
498 while ( ! dataAvailable() ); // will loop if only a connection/disconnection event was read
504 * Receives next block of data in the specified. The length and hostid are output arguments.
505 * Precond: dataAvailable() has returned true, phostid not null
507 void CBufServer::receive( CMemStream
& buffer
, TSockId
* phostid
)
509 nlnettrace( "CBufServer::receive" );
510 //nlassert( dataAvailable() );
511 nlassert( phostid
!= NULL
);
514 CFifoAccessor
recvfifo( &receiveQueue() );
515 nlassert( ! recvfifo
.value().empty() );
516 recvfifo
.value().front( buffer
);
517 recvfifo
.value().pop();
518 setDataAvailableFlag( ! recvfifo
.value().empty() );
521 // Extract hostid (and event type)
522 *phostid
= *((TSockId
*)&(buffer
.buffer()[buffer
.size()-sizeof(TSockId
)-1]));
523 nlassert( buffer
.buffer()[buffer
.size()-1] == CBufNetBase::User
);
525 buffer
.resize( buffer
.size()-sizeof(TSockId
)-1 );
527 // TODO OPTIM remove the nldebug for speed
528 //commented for optimisation LNETL1_DEBUG( "LNETL1: Read buffer (%d+%d B) from %s", buffer.size(), sizeof(TSockId)+1, /*stringFromVector(buffer).c_str(), */(*phostid)->asString().c_str() );
531 _BytesPoppedIn
+= buffer
.size() + sizeof(TBlockSize
);
536 * Update the network (call this method evenly)
538 void CBufServer::update()
540 //nlnettrace( "CBufServer::update-BEGIN" );
545 CThreadPool::iterator ipt
;
547 //nldebug( "UPD: Acquiring the Thread Pool" );
548 CSynchronized
<CThreadPool
>::CAccessor
poolsync( &_ThreadPool
);
549 //nldebug( "UPD: Acquired." );
550 for ( ipt
=poolsync
.value().begin(); ipt
!=poolsync
.value().end(); ++ipt
)
552 // For each thread of the pool
553 CServerReceiveTask
*task
= receiveTask(ipt
);
554 CConnections::iterator ipb
;
556 CSynchronized
<CConnections
>::CAccessor
connectionssync( &task
->_Connections
);
557 for ( ipb
=connectionssync
.value().begin(); ipb
!=connectionssync
.value().end(); ++ipb
)
559 // For each socket of the thread, update sending
560 if ( ! ((*ipb
)->Sock
->connected() && (*ipb
)->update()) )
562 // Update did not work or the socket is not connected anymore
563 LNETL1_DEBUG( "LNETL1: Socket %s is disconnected", (*ipb
)->asString().c_str() );
564 // Disconnection event if disconnected (known either from flush (in update) or when receiving data)
565 (*ipb
)->advertiseDisconnection( this, *ipb
);
567 /*if ( (*ipb)->advertiseDisconnection( this, *ipb ) )
569 // Now the connection removal is in dataAvailable()
582 //nlnettrace( "CBufServer::update-END" );
585 uint32
CBufServer::getSendQueueSize( TSockId destid
)
587 if ( destid
!= InvalidSockId
)
589 if (_ConnectedClients
.find(destid
) == _ConnectedClients
.end())
591 // this host is not connected
595 return destid
->SendFifo
.size();
599 // add all client buffers
604 CThreadPool::iterator ipt
;
606 CSynchronized
<CThreadPool
>::CAccessor
poolsync( &_ThreadPool
);
607 for ( ipt
=poolsync
.value().begin(); ipt
!=poolsync
.value().end(); ++ipt
)
609 // For each thread of the pool
610 CServerReceiveTask
*task
= receiveTask(ipt
);
611 CConnections::iterator ipb
;
613 CSynchronized
<CConnections
>::CAccessor
connectionssync( &task
->_Connections
);
614 for ( ipb
=connectionssync
.value().begin(); ipb
!=connectionssync
.value().end(); ++ipb
)
616 // For each socket of the thread, update sending
617 total
= (*ipb
)->SendFifo
.size ();
626 void CBufServer::displayThreadStat (NLMISC::CLog
*log
)
629 CThreadPool::iterator ipt
;
631 CSynchronized
<CThreadPool
>::CAccessor
poolsync( &_ThreadPool
);
632 for ( ipt
=poolsync
.value().begin(); ipt
!=poolsync
.value().end(); ++ipt
)
634 // For each thread of the pool
635 CServerReceiveTask
*task
= receiveTask(ipt
);
636 // For each socket of the thread, update sending
637 log
->displayNL ("server receive thread %p nbloop %d", task
, task
->NbLoop
);
641 log
->displayNL ("server listen thread %p nbloop %d", _ListenTask
, _ListenTask
->NbLoop
);
644 void CBufServer::setTimeFlushTrigger( TSockId destid
, sint32 ms
)
646 nlassert( destid
!= InvalidSockId
);
647 if (_ConnectedClients
.find(destid
) != _ConnectedClients
.end())
648 destid
->setTimeFlushTrigger( ms
);
651 void CBufServer::setSizeFlushTrigger( TSockId destid
, sint32 size
)
653 nlassert( destid
!= InvalidSockId
);
654 if (_ConnectedClients
.find(destid
) != _ConnectedClients
.end())
655 destid
->setSizeFlushTrigger( size
);
658 bool CBufServer::flush( TSockId destid
, uint
*nbBytesRemaining
)
660 nlassert( destid
!= InvalidSockId
);
661 if (_ConnectedClients
.find(destid
) != _ConnectedClients
.end())
662 return destid
->flush( nbBytesRemaining
);
666 const CInetAddress
& CBufServer::hostAddress( TSockId hostid
)
668 nlassert( hostid
!= InvalidSockId
);
669 if (_ConnectedClients
.find(hostid
) != _ConnectedClients
.end())
670 return hostid
->Sock
->remoteAddr();
672 static CInetAddress nullAddr
;
676 void CBufServer::displaySendQueueStat (NLMISC::CLog
*log
, TSockId destid
)
678 if ( destid
!= InvalidSockId
)
680 if (_ConnectedClients
.find(destid
) == _ConnectedClients
.end())
682 // this host is not connected
686 destid
->SendFifo
.displayStats(log
);
690 // add all client buffers
693 CThreadPool::iterator ipt
;
695 CSynchronized
<CThreadPool
>::CAccessor
poolsync( &_ThreadPool
);
696 for ( ipt
=poolsync
.value().begin(); ipt
!=poolsync
.value().end(); ++ipt
)
698 // For each thread of the pool
699 CServerReceiveTask
*task
= receiveTask(ipt
);
700 CConnections::iterator ipb
;
702 CSynchronized
<CConnections
>::CAccessor
connectionssync( &task
->_Connections
);
703 for ( ipb
=connectionssync
.value().begin(); ipb
!=connectionssync
.value().end(); ++ipb
)
705 // For each socket of the thread, update sending
706 (*ipb
)->SendFifo
.displayStats(log
);
716 * Returns the number of bytes received since the previous call to this method
718 uint64
CBufServer::newBytesReceived()
720 uint64 b
= bytesReceived();
721 uint64 nbrecvd
= b
- _PrevBytesPoppedIn
;
722 //nlinfo( "b: %" NL_I64 "u new: %" NL_I64 "u", b, nbrecvd );
723 _PrevBytesPoppedIn
= b
;
728 * Returns the number of bytes sent since the previous call to this method
730 uint64
CBufServer::newBytesSent()
732 uint64 b
= bytesSent();
733 uint64 nbsent
= b
- _PrevBytesPushedOut
;
734 //nlinfo( "b: %" NL_I64 "u new: %" NL_I64 "u", b, nbsent );
735 _PrevBytesPushedOut
= b
;
740 /***************************************************************************************************
742 **************************************************************************************************/
746 * Code of listening thread
748 void CListenTask::run()
751 NbServerListenTask
++;
753 nlnettrace( "CListenTask::run" );
758 descmax
= _ListenSock
.descriptor()>_WakeUpPipeHandle
[PipeRead
]?_ListenSock
.descriptor():_WakeUpPipeHandle
[PipeRead
];
761 // Accept connections
762 while ( ! exitRequired() )
766 LNETL1_DEBUG( "LNETL1: Waiting incoming connection..." );
767 // Get and setup the new socket
770 FD_SET( _ListenSock
.descriptor(), &readers
);
771 FD_SET( _WakeUpPipeHandle
[PipeRead
], &readers
);
772 int res
= ::select( descmax
+1, &readers
, NULL
, NULL
, NULL
); /// Wait indefinitely
776 //case 0 : continue; // time-out expired, no results
778 // we'll ignore message (Interrupted system call) caused by a CTRL-C
779 if (CSock::getLastError() == 4)
781 LNETL1_DEBUG ("LNETL1: Select failed (in listen thread): %s (code %u) but IGNORED", CSock::errorString( CSock::getLastError() ).c_str(), CSock::getLastError());
784 nlerror( "LNETL1: Select failed (in listen thread): %s (code %u)", CSock::errorString( CSock::getLastError() ).c_str(), CSock::getLastError() );
787 if ( FD_ISSET( _WakeUpPipeHandle
[PipeRead
], &readers
) )
790 if ( read( _WakeUpPipeHandle
[PipeRead
], &b
, 1 ) == -1 ) // we were woken-up by the wake-up pipe
792 LNETL1_DEBUG( "LNETL1: In CListenTask::run(): read() failed" );
794 LNETL1_DEBUG( "LNETL1: listen thread select woken-up" );
797 #elif defined (NL_OS_WINDOWS)
799 FD_SET( _ListenSock
.descriptor(), &readers
);
800 int res
= ::select( 1, &readers
, NULL
, NULL
, NULL
); /// Wait indefinitely
804 nlerror( "LNETL1: Select failed (in listen thread): %s (code %u)", CSock::errorString( CSock::getLastError() ).c_str(), CSock::getLastError() );
808 LNETL1_DEBUG( "LNETL1: Accepting an incoming connection..." );
809 CTcpSock
*newSock
= _ListenSock
.accept();
812 CServerBufSock
*bufsock
= new CServerBufSock( newSock
);
813 LNETL1_DEBUG( "LNETL1: New connection : %s", bufsock
->asString().c_str() );
814 bufsock
->setNonBlocking();
815 bufsock
->setMaxExpectedBlockSize( _MaxExpectedBlockSize
);
816 if ( _Server
->noDelay() )
818 bufsock
->Sock
->setNoDelay( true );
821 // Notify the new connection
822 bufsock
->advertiseConnection( _Server
);
824 // Dispatch the socket into the thread pool
825 _Server
->dispatchNewSocket( bufsock
);
830 catch (const ESocket
&e
)
832 LNETL1_INFO( "LNETL1: Exception in listen thread: %s", e
.what() );
833 // It can occur when too many sockets are open (e.g. 885 connections)
837 nlnettrace( "Exiting CListenTask::run" );
838 NbServerListenTask
--;
842 /// Close listening socket
843 void CListenTask::close()
846 // _ListenSock.disconnect();
851 * Binds a new socket and send buffer to an existing or a new thread
852 * Note: this method is called in the listening thread.
854 void CBufServer::dispatchNewSocket( CServerBufSock
*bufsock
)
856 nlnettrace( "CBufServer::dispatchNewSocket" );
858 CSynchronized
<CThreadPool
>::CAccessor
poolsync( &_ThreadPool
);
859 if ( _ThreadStrategy
== SpreadSockets
)
861 // Find the thread with the smallest number of connections and check if all
862 // threads do not have the same number of connections
863 uint min
= 0xFFFFFFFF;
865 CThreadPool::iterator ipt
, iptmin
, iptmax
;
866 for ( iptmin
=iptmax
=ipt
=poolsync
.value().begin(); ipt
!=poolsync
.value().end(); ++ipt
)
868 uint noc
= receiveTask(ipt
)->numberOfConnections();
881 // Check if we make the pool of threads grow (if we have not found vacant room
882 // and if it is allowed to)
883 if ( (poolsync
.value().empty()) ||
884 ((min
== max
) && (poolsync
.value().size() < _MaxThreads
)) )
886 addNewThread( poolsync
.value(), bufsock
);
890 // Dispatch socket to an existing thread of the pool
891 CServerReceiveTask
*task
= receiveTask(iptmin
);
892 bufsock
->setOwnerTask( task
);
893 task
->addNewSocket( bufsock
);
898 if ( min
>= (uint
)_MaxSocketsPerThread
)
900 nlwarning( "LNETL1: Exceeding the maximum number of sockets per thread" );
902 LNETL1_DEBUG( "LNETL1: New socket dispatched to thread %d", iptmin
-poolsync
.value().begin() );
906 else // _ThreadStrategy == FillThreads
908 CThreadPool::iterator ipt
;
909 for ( ipt
=poolsync
.value().begin(); ipt
!=poolsync
.value().end(); ++ipt
)
911 uint noc
= receiveTask(ipt
)->numberOfConnections();
912 if ( noc
< _MaxSocketsPerThread
)
918 // Check if we have to make the thread pool grow (if we have not found vacant room)
919 if ( ipt
== poolsync
.value().end() )
921 if ( poolsync
.value().size() == _MaxThreads
)
923 nlwarning( "LNETL1: Exceeding the maximum number of threads" );
925 addNewThread( poolsync
.value(), bufsock
);
929 // Dispatch socket to an existing thread of the pool
930 CServerReceiveTask
*task
= receiveTask(ipt
);
931 bufsock
->setOwnerTask( task
);
932 task
->addNewSocket( bufsock
);
936 LNETL1_DEBUG( "LNETL1: New socket dispatched to thread %d", ipt
-poolsync
.value().begin() );
943 * Creates a new task and run a new thread for it
944 * Precond: bufsock not null
946 void CBufServer::addNewThread( CThreadPool
& threadpool
, CServerBufSock
*bufsock
)
948 nlnettrace( "CBufServer::addNewThread" );
949 nlassert( bufsock
!= NULL
);
951 // Create new task and dispatch the socket to it
952 CServerReceiveTask
*task
= new CServerReceiveTask( this );
953 bufsock
->setOwnerTask( task
);
954 task
->addNewSocket( bufsock
);
956 // Add a new thread to the pool, with this task
957 IThread
*thr
= IThread::create( task
, 1024*4*4 );
959 threadpool
.push_back( thr
);
961 LNETL1_DEBUG( "LNETL1: Added a new thread; pool size is %d", threadpool
.size() );
962 LNETL1_DEBUG( "LNETL1: New socket dispatched to thread %d", threadpool
.size()-1 );
967 /***************************************************************************************************
969 **************************************************************************************************/
973 * Code of receiving threads for servers
975 void CServerReceiveTask::run()
978 NbServerReceiveTask
++;
979 nlnettrace( "CServerReceiveTask::run" );
984 #if defined NL_OS_UNIX
986 if (nice( 2 ) == -1) // is this really useful as long as select() sleeps?
988 nlwarning("LNETL1: nice() failed: code=%d '%s'", errno
, strerror(errno
));
992 // Copy of _Connections
993 vector
<TSockId
> connections_copy
;
995 while ( ! exitRequired() )
997 // 1. Remove closed connections
998 clearClosedConnections();
1002 // 2-SELECT-VERSION : select() on the sockets handled in the present thread
1005 FD_ZERO( &readers
);
1007 bool alldisconnected
= true;
1008 CConnections::iterator ipb
;
1010 // Lock _Connections
1011 CSynchronized
<CConnections
>::CAccessor
connectionssync( &_Connections
);
1013 // Prepare to avoid select if there is no connection
1014 skip
= connectionssync
.value().empty();
1016 // Fill the select array and copy _Connections
1017 connections_copy
.clear();
1018 for ( ipb
=connectionssync
.value().begin(); ipb
!=connectionssync
.value().end(); ++ipb
)
1020 if ( (*ipb
)->Sock
->connected() ) // exclude disconnected sockets that are not deleted
1022 alldisconnected
= false;
1023 // Copy _Connections element
1024 connections_copy
.push_back( *ipb
);
1026 // Add socket descriptor to the select array
1027 FD_SET( (*ipb
)->Sock
->descriptor(), &readers
);
1029 // Calculate descmax for select
1030 if ( (*ipb
)->Sock
->descriptor() > descmax
)
1032 descmax
= (*ipb
)->Sock
->descriptor();
1038 // Add the wake-up pipe into the select array
1039 FD_SET( _WakeUpPipeHandle
[PipeRead
], &readers
);
1040 if ( _WakeUpPipeHandle
[PipeRead
]>descmax
)
1042 descmax
= _WakeUpPipeHandle
[PipeRead
];
1046 // Unlock _Connections, use connections_copy instead
1050 // Avoid select if there is no connection (Windows only)
1051 if ( skip
|| alldisconnected
)
1053 nlSleep( 1 ); // nice
1058 #ifdef NL_OS_WINDOWS
1060 tv
.tv_sec
= 0; // short time because the newly added connections can't be added to the select fd_set
1064 int res
= ::select( descmax
+1, &readers
, NULL
, NULL
, &tv
);
1066 #elif defined NL_OS_UNIX
1069 int res
= ::select( descmax
+1, &readers
, NULL
, NULL
, NULL
);
1071 #endif // NL_OS_WINDOWS
1076 // 3. Test the result
1079 #ifdef NL_OS_WINDOWS
1080 case 0 : continue; // time-out expired, no results
1083 // we'll ignore message (Interrupted system call) caused by a CTRL-C
1084 /*if (CSock::getLastError() == 4)
1086 LNETL1_DEBUG ("LNETL1: Select failed (in receive thread): %s (code %u) but IGNORED", CSock::errorString( CSock::getLastError() ).c_str(), CSock::getLastError());
1089 //nlerror( "LNETL1: Select failed (in receive thread): %s (code %u)", CSock::errorString( CSock::getLastError() ).c_str(), CSock::getLastError() );
1090 LNETL1_DEBUG( "LNETL1: Select failed (in receive thread): %s (code %u)", CSock::errorString( CSock::getLastError() ).c_str(), CSock::getLastError() );
1096 vector
<TSockId
>::iterator ic
;
1097 for ( ic
=connections_copy
.begin(); ic
!=connections_copy
.end(); ++ic
)
1099 if ( FD_ISSET( (*ic
)->Sock
->descriptor(), &readers
) != 0 )
1101 CServerBufSock
*serverbufsock
= static_cast<CServerBufSock
*>(static_cast<CBufSock
*>(*ic
));
1105 if ( serverbufsock
->receivePart( sizeof(TSockId
) + 1 ) ) // +1 for the event type
1107 serverbufsock
->fillSockIdAndEventType( *ic
);
1109 // Push message into receive queue
1113 _Server
->pushMessageIntoReceiveQueue( serverbufsock
->receivedBuffer() );
1115 //recvfifo.value().display();
1116 //bufsize = serverbufsock->receivedBuffer().size();
1117 //mbsize = recvfifo.value().size() / 1048576;
1118 //nldebug( "RCV: Released." );
1121 nlwarning( "The receive queue size exceeds %d MB", mbsize );
1126 CSynchronized<uint32>::CAccessor syncbpi ( &_Server->syncBytesPushedIn() );
1127 syncbpi.value() += bufsize;
1132 // catch (const ESocketConnectionClosed&)
1134 // LNETL1_DEBUG( "LNETL1: Connection %s closed", serverbufsock->asString().c_str() );
1135 // // The socket went to _Connected=false when throwing the exception
1137 catch (const ESocket
&)
1139 LNETL1_DEBUG( "LNETL1: Connection %s broken", serverbufsock
->asString().c_str() );
1140 (*ic
)->Sock
->disconnect();
1144 skip = true; // don't check _WakeUpPipeHandle (yes, check it to read any written byte)
1152 // Test wake-up pipe
1153 if ( (!skip
) && (FD_ISSET( _WakeUpPipeHandle
[PipeRead
], &readers
)) )
1156 if ( read( _WakeUpPipeHandle
[PipeRead
], &b
, 1 ) == -1 ) // we were woken-up by the wake-up pipe
1158 LNETL1_DEBUG( "LNETL1: In CServerReceiveTask::run(): read() failed" );
1160 LNETL1_DEBUG( "LNETL1: Receive thread select woken-up" );
1167 nlnettrace( "Exiting CServerReceiveTask::run" );
1168 NbServerReceiveTask
--;
1174 * Delete all connections referenced in the remove list (double-mutexed)
1177 void CServerReceiveTask::clearClosedConnections()
1179 CConnections::iterator ic
;
1181 NLMISC::CSynchronized
<CConnections
>::CAccessor
removesetsync( &_RemoveSet
);
1183 if ( ! removesetsync
.value().empty() )
1185 // Delete closed connections
1186 NLMISC::CSynchronized
<CConnections
>::CAccessor
connectionssync( &_Connections
);
1187 for ( ic
=removesetsync
.value().begin(); ic
!=removesetsync
.value().end(); ++ic
)
1189 LNETL1_DEBUG( "LNETL1: Removing a connection" );
1191 TSockId sid
= (*ic
);
1193 // Remove from the connection list
1194 connectionssync
.value().erase( *ic
);
1196 // Delete the socket object
1199 // Clear remove list
1200 removesetsync
.value().clear();
1206 NLMISC_CATEGORISED_VARIABLE(nel
, uint32
, NbServerListenTask
, "Number of server listen thread");
1207 NLMISC_CATEGORISED_VARIABLE(nel
, uint32
, NbServerReceiveTask
, "Number of server receive thread");