1 // NeL - MMORPG Framework <http://dev.ryzom.com/projects/nel/>
2 // Copyright (C) 2010 Winch Gate Property Limited
4 // This source file has been modified by the following contributors:
5 // Copyright (C) 2014 Jan BOON (Kaetemi) <jan.boon@kaetemi.be>
7 // This program is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Affero General Public License as
9 // published by the Free Software Foundation, either version 3 of the
10 // License, or (at your option) any later version.
12 // This program is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU Affero General Public License for more details.
17 // You should have received a copy of the GNU Affero General Public License
18 // along with this program. If not, see <http://www.gnu.org/licenses/>.
22 #include "nel/misc/hierarchical_timer.h"
24 #include "nel/net/buf_client.h"
25 #include "nel/misc/thread.h"
26 #include "nel/net/dummy_tcp_sock.h"
27 #include "nel/net/net_log.h"
30 # ifndef NL_COMP_MINGW
34 #elif defined NL_OS_UNIX
35 # include <netinet/in.h>
42 using namespace NLMISC
;
49 uint32 NbClientReceiveTask
= 0;
52 /***************************************************************************************************
53 * User main thread (initialization)
54 **************************************************************************************************/
60 CBufClient::CBufClient( bool nodelay
, bool replaymode
, bool initPipeForDataAvailable
) :
61 CBufNetBase( initPipeForDataAvailable
),
63 CBufClient::CBufClient( bool nodelay
, bool replaymode
, bool ) :
67 _PrevBytesDownloaded( 0 ),
68 _PrevBytesUploaded( 0 ),
71 /*_PrevBytesReceived( 0 ),
74 nlnettrace( "CBufClient::CBufClient" ); // don't define a global object
78 _BufSock
= new CNonBlockingBufSock( new CDummyTcpSock(), CBufNetBase::DefaultMaxExpectedBlockSize
);
83 _BufSock
= new CNonBlockingBufSock( NULL
, CBufNetBase::DefaultMaxExpectedBlockSize
);
84 _RecvTask
= new CClientReceiveTask( this, _BufSock
);
90 * Connects to the specified host
91 * Precond: not connected
93 void CBufClient::connect( const CInetAddress
& addr
)
95 nlnettrace( "CBufClient::connect" );
96 nlassert( ! _BufSock
->Sock
->connected() );
97 _BufSock
->setMaxExpectedBlockSize( maxExpectedBlockSize() );
98 _BufSock
->connect( addr
, _NoDelay
, true );
99 _BufSock
->setNonBlocking(); // ADDED: non-blocking client connection
100 _PrevBytesDownloaded
= 0;
101 _PrevBytesUploaded
= 0;
102 /*_PrevBytesReceived = 0;
103 _PrevBytesSent = 0;*/
105 // Allow reconnection
106 if ( _RecvThread
!= NULL
)
111 _RecvThread
= IThread::create( _RecvTask
, 1024*4*4 );
112 _RecvThread
->start();
116 /***************************************************************************************************
117 * User main thread (running)
118 **************************************************************************************************/
120 void CBufClient::displayThreadStat (NLMISC::CLog
*log
)
122 log
->displayNL ("client thread %p nbloop %d", _RecvTask
, _RecvTask
->NbLoop
);
127 * Sends a message to the remote host
129 void CBufClient::send( const NLMISC::CMemStream
& buffer
)
131 nlnettrace( "CBufClient::send" );
132 nlassert( buffer
.length() > 0 );
133 nlassert( buffer
.length() <= maxSentBlockSize() );
135 // slow down the layer H_AUTO (CBufServer_send);
137 if ( ! _BufSock
->pushBuffer( buffer
) )
139 // Disconnection event if disconnected
140 _BufSock
->advertiseDisconnection( this, NULL
);
146 * Checks if there are some data to receive
148 bool CBufClient::dataAvailable()
150 // slow down the layer H_AUTO (CBufClient_dataAvailable);
152 /* If no data available, enter the 'while' loop and return false (1 volatile test)
153 * If there are user data available, enter the 'while' and return true immediately (1 volatile test + 1 short locking)
154 * If there is a disconnection event (rare), call the callback and loop
156 while ( dataAvailableFlag() )
158 // Because _DataAvailable is true, the receive queue is not empty at this point
161 CFifoAccessor
recvfifo( &receiveQueue() );
162 val
= recvfifo
.value().frontLast ();
167 if ( read( _DataAvailablePipeHandle
[PipeRead
], &b
, 1 ) == -1 )
168 nlwarning( "LNETL1: Read pipe failed in dataAvailable" );
169 //nldebug( "Pipe: 1 byte read (client %p)", this );
172 // Test if it the next block is a system event
176 // Normal message available
177 case CBufNetBase::User
:
178 return true; // return immediatly, do not extract the message
180 // Process disconnection event
181 case CBufNetBase::Disconnection
:
183 LNETL1_DEBUG( "LNETL1: Disconnection event" );
184 _BufSock
->setConnectedState( false );
186 // Call callback if needed
187 if ( disconnectionCallback() != NULL
)
189 disconnectionCallback()( id(), argOfDisconnectionCallback() );
192 // Unlike the server version, we do not delete the CBufSock object here,
193 // it will be done in the destructor of CBufClient
197 default: // should not occur
199 CFifoAccessor
recvfifo( &receiveQueue() );
200 vector
<uint8
> buffer
;
201 recvfifo
.value().front (buffer
);
202 LNETL1_INFO( "LNETL1: Invalid block type: %hu (should be = %hu)", (uint16
)(buffer
[buffer
.size()-1]), (uint16
)val
);
203 LNETL1_INFO( "LNETL1: Buffer (%d B): [%s]", buffer
.size(), stringFromVector(buffer
).c_str() );
204 LNETL1_INFO( "LNETL1: Receive queue:" );
205 recvfifo
.value().display();
206 nlerror( "LNETL1: Invalid system event type in client receive queue" );
209 // Extract system event
211 CFifoAccessor
recvfifo( &receiveQueue() );
212 recvfifo
.value().pop();
213 setDataAvailableFlag( ! recvfifo
.value().empty() );
217 // _DataAvailable is false here
224 /* Wait until the receive queue contains something to read (implemented with a select()).
225 * This is where the connection/disconnection callbacks can be called.
226 * \param usecMax Max time to wait in microsecond (up to 1 sec)
228 void CBufClient::sleepUntilDataAvailable( uint usecMax
)
230 // Prevent looping infinitely if the system time was changed
231 if ( usecMax
> 999999 ) // limit not told in Linux man but here: http://docs.hp.com/en/B9106-90009/select.2.html
239 FD_SET( _DataAvailablePipeHandle
[PipeRead
], &readers
);
241 tv
.tv_usec
= usecMax
;
242 int res
= ::select( _DataAvailablePipeHandle
[PipeRead
]+1, &readers
, NULL
, NULL
, &tv
);
244 nlerror( "LNETL1: Select failed in sleepUntilDataAvailable (code %u)", CSock::getLastError() );
246 while ( ! dataAvailable() ); // will loop if only a connection/disconnection event was read
252 * Receives next block of data in the specified buffer (resizes the vector)
253 * Precond: dataAvailable() has returned true
255 void CBufClient::receive( NLMISC::CMemStream
& buffer
)
257 nlnettrace( "CBufClient::receive" );
258 //nlassert( dataAvailable() );
260 // Extract buffer from the receive queue
262 CFifoAccessor
recvfifo( &receiveQueue() );
263 nlassert( ! recvfifo
.value().empty() );
264 recvfifo
.value().front( buffer
);
265 recvfifo
.value().pop();
266 setDataAvailableFlag( ! recvfifo
.value().empty() );
269 // Extract event type
270 nlassert( buffer
.buffer()[buffer
.size()-1] == CBufNetBase::User
);
271 //commented for optimisation LNETL1_DEBUG( "LNETL1: Client read buffer (%d+%d B)", buffer.size(), sizeof(TSockId)+1 );
272 buffer
.resize( buffer
.size()-1 );
277 * Update the network (call this method evenly)
279 void CBufClient::update()
281 //nlnettrace( "CBufClient::update" );
284 bool sendingok
= _BufSock
->update();
286 // Disconnection event if disconnected
287 if ( ! ( _BufSock
->Sock
->connected() && sendingok
) )
289 if ( _BufSock
->Sock
->connected() )
291 _BufSock
->Sock
->disconnect();
293 _BufSock
->advertiseDisconnection( this, NULL
);
299 * Disconnect the remote host
301 void CBufClient::disconnect( bool quick
)
303 nlnettrace( "CBufClient::disconnect" );
305 // Do not allow to disconnect a socket that is not connected
306 nlassert( _BufSock
->connectedState() );
308 // When the NS tells us to remove this connection AND the connection has physically
309 // disconnected but not yet logically (i.e. disconnection event not processed yet),
310 // skip flushing and physical active disconnection
311 if ( _BufSock
->Sock
->connected() )
313 // Flush sending is asked for
319 // Disconnect and prevent from advertising the disconnection
320 _BufSock
->disconnect( false );
323 // Empty the receive queue
325 CFifoAccessor
recvfifo( &receiveQueue() );
326 recvfifo
.value().clear();
327 setDataAvailableFlag( false );
332 // Utility function for newBytes...()
333 inline uint64
updateStatCounter( uint64
& counter
, uint64 newvalue
)
335 uint64 result
= newvalue
- counter
;
342 * Returns the number of bytes downloaded since the previous call to this method
344 uint64
CBufClient::newBytesDownloaded()
346 return updateStatCounter( _PrevBytesDownloaded
, bytesDownloaded() );
351 * Returns the number of bytes uploaded since the previous call to this method
353 uint64
CBufClient::newBytesUploaded()
355 return updateStatCounter( _PrevBytesUploaded
, bytesUploaded() );
360 * Returns the number of bytes popped by receive() since the previous call to this method
362 /*uint64 CBufClient::newBytesReceived()
364 return updateStatCounter( _PrevBytesReceived, bytesReceived() );
369 * Returns the number of bytes pushed by send() since the previous call to this method
371 /*uint64 CBufClient::newBytesSent()
373 return updateStatCounter( _PrevBytesSent, bytesSent() );
380 CBufClient::~CBufClient()
382 nlnettrace( "CBufClient::~CBufClient" );
384 // Disconnect if not done
385 if ( _BufSock
->Sock
->connected() )
387 nlassert( _BufSock
->connectedState() );
392 // Clean thread termination
393 if ( _RecvThread
!= NULL
)
395 LNETL1_DEBUG( "LNETL1: Waiting for the end of the receive thread..." );
399 if ( _RecvTask
!= NULL
)
402 if ( _RecvThread
!= NULL
)
405 if ( _BufSock
!= NULL
)
408 nlnettrace( "Exiting CBufClient::~CBufClient" );
412 /***************************************************************************************************
414 **************************************************************************************************/
418 * Code of receiving thread for clients
420 void CClientReceiveTask::run()
422 NbClientReceiveTask
++;
424 nlnettrace( "CClientReceiveTask::run" );
426 // 18/08/2005 : sonix : Changed time out from 60s to 1s, in some case, it
427 // can generate a 60 s wait on destruction of the CBufSock
428 // By the way, checking every 1s is not a time consuming
429 _NBBufSock
->Sock
->setTimeOutValue( 1, 0 );
431 bool connected
= true;
432 while ( connected
&& _NBBufSock
->Sock
->connected())
436 // ADDED: non-blocking client connection
438 // Wait until some data are received (sleepin' select inside)
439 while ( ! _NBBufSock
->Sock
->dataAvailable() )
441 if ( ! _NBBufSock
->Sock
->connected() )
443 LNETL1_DEBUG( "LNETL1: Client connection %s closed", sockId()->asString().c_str() );
444 // The socket went to _Connected=false when throwing the exception
450 // Process the data received
451 if ( _NBBufSock
->receivePart( 1 ) ) // 1 for the event type
453 //commented out for optimisation: LNETL1_DEBUG( "LNETL1: Client %s received buffer (%u bytes)", _SockId->asString().c_str(), buffer.size()/*, stringFromVector(buffer).c_str()*/ );
455 _NBBufSock
->fillEventTypeOnly();
457 // Push message into receive queue
458 _Client
->pushMessageIntoReceiveQueue( _NBBufSock
->receivedBuffer() );
463 catch (const ESocket
&)
465 LNETL1_DEBUG( "LNETL1: Client connection %s broken", sockId()->asString().c_str() );
466 sockId()->Sock
->disconnect();
471 nlnettrace( "Exiting CClientReceiveTask::run()" );
472 NbClientReceiveTask
--;
476 NLMISC_CATEGORISED_VARIABLE(nel
, uint32
, NbClientReceiveTask
, "Number of client receive thread");