1 // NeL - MMORPG Framework <http://dev.ryzom.com/projects/nel/>
2 // Copyright (C) 2010 Winch Gate Property Limited
4 // This source file has been modified by the following contributors:
5 // Copyright (C) 2014-2016 Jan BOON (Kaetemi) <jan.boon@kaetemi.be>
7 // This program is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Affero General Public License as
9 // published by the Free Software Foundation, either version 3 of the
10 // License, or (at your option) any later version.
12 // This program is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU Affero General Public License for more details.
17 // You should have received a copy of the GNU Affero General Public License
18 // along with this program. If not, see <http://www.gnu.org/licenses/>.
24 #include "nel/misc/types_nl.h"
34 # include <sys/stat.h>
35 # define mkdir(a) mkdir(a,0755)
39 #include "nel/misc/common.h"
40 #include "nel/misc/debug.h"
41 #include "nel/misc/mem_stream.h"
42 #include "nel/misc/path.h"
44 #include "nel/net/service.h"
45 #include "nel/net/udp_sock.h"
47 #include "receive_task.h"
50 # ifndef NL_COMP_MINGW
54 #endif // NL_OS_WINDOWS
65 using namespace NLMISC
;
66 using namespace NLNET
;
74 CClient (TSockId from
, uint32 session
, const string
&cn
) : From(from
), Session(session
), NextPingNumber(0), LastPongReceived(0), ConnectionName(cn
),
75 BlockNumber(0), FullMeanPongTime(0), FullNbPong(0), NbPing(0), NbPong(0), MeanPongTime(0), NbDuplicated(0), FirstWrite(true)
76 { PongReceived
.resize (1001); }
78 CInetAddress Address
; // udp address
79 TSockId From
; // used to find the TCP connection
80 uint32 Session
; // used to find the link between UDP and TCP connection at startup
82 vector
<pair
<uint8
, uint16
> > PongReceived
; // contains the number of pong receive for each message number and the time
84 uint32 NextPingNumber
, LastPongReceived
;
85 string ConnectionName
;
87 // this number is increase each time we filled the PongReceived array, the goal is to avoid a very old packet to use as a new one
90 uint32 FullMeanPongTime
, FullNbPong
;
92 // used for stat, reset every stat update
93 uint32 NbPing
, NbPong
, MeanPongTime
, NbDuplicated
;
95 // true if the client just connect and we don't log stat
98 void updatePong (sint64 pingTime
, sint64 pongTime
, uint32 pongNumber
, uint32 blockNumber
);
100 void updateFullStat ();
103 struct TInetAddressHash
105 enum { bucket_size
= 4, min_buckets
= 8, };
107 inline bool operator() (const NLNET::CInetAddress
&x1
, const NLNET::CInetAddress
&x2
) const
113 inline size_t operator() ( const NLNET::CInetAddress
& x
) const
116 //return x.internalIPAddress();
124 typedef CHashMap
<NLNET::CInetAddress
,CClient
*,TInetAddressHash
> TClientMap
;
125 #define GETCLIENTA(it) (*it).second
131 // must be increase at each version and must be the same value as the client
134 string StatPathName
= "stats/";
136 uint16 UDPPort
= 45455;
137 uint16 TCPPort
= 45456;
138 uint32 MaxUDPPacketSize
= 1000;
140 CBufFIFO Queue1
, Queue2
;
142 CBufFIFO
*CurrentReadQueue
= NULL
;
144 TReceivedMessage
*CurrentInMsg
= NULL
;
146 IThread
*ReceiveThread
= NULL
;
147 CReceiveTask
*ReceiveTask
= NULL
;
149 list
<CClient
> Clients
; // contains all clients
150 TClientMap ClientMap
; // contains a quick access to the client using the udp address
152 // TCP server for clients
153 CCallbackServer
*CallbackServer
= NULL
;
164 newtime
= localtime( &long_time
);
167 string res
= toString("%02d", newtime
->tm_year
-100) + "_";
168 res
+= toString("%02d", newtime
->tm_mon
+1) + "_";
169 res
+= toString("%02d", newtime
->tm_mday
);
173 return "bad date "+toString( (uint32
)long_time
);
180 void cbInit (CMessage
&msgin
, TSockId from
, CCallbackNetBase
&netbase
)
182 uint64 session
= (uint64
)(uintptr_t) from
;
184 string connectionName
;
185 msgin
.serial (connectionName
);
190 msgin
.serial (version
);
191 if (version
!= Version
)
193 // bad client version, disconnect it
194 CallbackServer
->disconnect (from
);
198 catch (const Exception
&)
200 // bad client version, disconnect it
201 CallbackServer
->disconnect (from
);
205 CMessage
msgout ("INIT");
206 msgout
.serial (session
);
207 CallbackServer
->send (msgout
, from
);
209 Clients
.push_back(CClient(from
, (uint32
)session
, connectionName
));
210 nlinfo ("Added client TCP %s, session %x", from
->asString().c_str(), session
);
213 void cbDisconnect (TSockId from
, void *arg
)
215 for (list
<CClient
>::iterator it
= Clients
.begin(); it
!= Clients
.end(); it
++)
217 if ((*it
).From
== from
)
220 (*it
).updateFullStat();
221 nlinfo( "Removing client %s", (*it
).Address
.asString().c_str() );
222 ClientMap
.erase ((*it
).Address
);
233 TCallbackItem CallbackArray
[] =
240 void CClient::updatePong (sint64 pingTime
, sint64 pongTime
, uint32 pongNumber
, uint32 blockNumber
)
242 // it means that it s a too old packet, discard it
243 if (blockNumber
!= BlockNumber
)
246 // add the pong in the array to detect lost, duplication
247 if (pongNumber
>= PongReceived
.size())
249 // if the array is too big, we flush actual data and restart all
254 PongReceived
[pongNumber
].first
++;
256 if (PongReceived
[pongNumber
].first
> 1)
262 // increase only for new pong
264 MeanPongTime
+= (uint32
)(pongTime
-pingTime
);
267 FullMeanPongTime
+= (uint32
)(pongTime
-pingTime
);
269 PongReceived
[pongNumber
].second
= (uint16
)(pongTime
- pingTime
);
272 if (pongNumber
> LastPongReceived
)
273 LastPongReceived
= pongNumber
;
275 // write each pong in a file
276 string ha
= Address
.hostName();
279 ha
= Address
.ipAddress();
281 string fn
= StatPathName
+ ConnectionName
+ "_" + ha
+ "_" + getDate() + ".pong";
283 FILE *fp
= nlfopen (fn
, "rt");
286 // new file, add the header
287 FILE *fp
= nlfopen (fn
, "wt");
290 fprintf (fp
, "#%s\t%s\t%s\t%s\n", "PingTime", "PongTime", "Delta", "PingNumber");
299 fp
= nlfopen (fn
, "at");
302 nlwarning ("Can't open pong file name '%s'", fn
.c_str());
306 fprintf (fp
, "%" NL_I64
"d\t%" NL_I64
"d\t%" NL_I64
"d\t%d\n", pongTime
, pingTime
, (pongTime
-pingTime
), pongNumber
);
311 void CClient::updateFullStat ()
313 uint32 NbLost
= 0, NbDup
= 0, NbPong
= 0;
315 /* if (Address.hostName().empty())
317 // don't log because we receive no pong at all
321 for (uint i
= 0; i
< LastPongReceived
; i
++)
323 if (PongReceived
[i
].first
== 0) NbLost
++;
327 NbDup
+= PongReceived
[i
].first
- 1;
332 // write each pong in a file
333 string ha
= Address
.hostName();
336 ha
= Address
.ipAddress();
338 string fn
= StatPathName
+ ConnectionName
+ "_" + ha
+ "_" + getDate() + ".stat";
340 string line
= "Full Summary: ";
341 line
+= "NbPing " + toString(LastPongReceived
) + " ";
342 line
+= "NbPong " + toString(NbPong
) + " ";
343 line
+= "NbLost " + toString(NbLost
) + " ";
344 if (LastPongReceived
>0) line
+= "(" + toString((float)NbLost
/LastPongReceived
*100.0f
) + "pc) ";
345 line
+= "NbDuplicated " + toString(NbDup
) + " ";
346 if (LastPongReceived
>0) line
+= "(" + toString((float)NbDup
/LastPongReceived
*100.0f
) + "pc) ";
349 line
+= "MeanPongTime <Undef> ";
351 line
+= "MeanPongTime " + toString(FullMeanPongTime
/FullNbPong
) + " ";
353 FILE *fp
= fopen (fn
.c_str(), "at");
356 nlwarning ("Can't open stat file name '%s'", fn
.c_str());
360 fprintf (fp
, "%s\n", line
.c_str());
363 // send the full sumary to the client
364 CMessage
msgout("INFO");
366 CallbackServer
->send (msgout
, From
);
369 nlinfo (line
.c_str());
374 // write each ping in a file
375 string ha
= Address
.hostName();
378 ha
= Address
.ipAddress();
380 string fn
= StatPathName
+ ConnectionName
+ "_" + ha
+ "_" + getDate() + ".ping";
382 FILE *fp
= fopen (fn
.c_str(), "rt");
385 // new file, add the header
386 FILE *fp
= fopen (fn
.c_str(), "wt");
389 fprintf (fp
, "#%s\t%s\n", "NbPongRcv", "Delta");
398 fp
= fopen (fn
.c_str(), "at");
401 nlwarning ("Can't open ping file name '%s'", fn
.c_str());
405 // add a fake value to know that it s a different session
406 fprintf (fp
, "-1\t0\n");
407 for (uint i
= 0; i
< LastPongReceived
; i
++)
409 fprintf (fp
, "%d\t%d\n", PongReceived
[i
].first
, PongReceived
[i
].second
);
415 // clear all structures
417 PongReceived
.clear ();
418 PongReceived
.resize (1001);
422 NextPingNumber
= LastPongReceived
= 0;
424 FullMeanPongTime
= FullNbPong
= 0;
426 // NbPing = NbPong = MeanPongTime = NbDuplicated = 0;
429 void CClient::updateStat ()
431 // write each pong in a file
432 string ha
= Address
.hostName();
435 ha
= Address
.ipAddress();
437 string fn
= StatPathName
+ ConnectionName
+ "_" + ha
+ "_" + getDate() + ".stat";
440 line
+= "NbPing " + toString(NbPing
) + " ";
441 line
+= "NbPong " + toString(NbPong
) + " ";
443 line
+= "MeanPongTime <Undef> ";
445 line
+= "MeanPongTime " + toString(MeanPongTime
/NbPong
) + " ";
446 line
+= "NbDuplicated " + toString(NbDuplicated
) + " ";
448 FILE *fp
= fopen (fn
.c_str(), "at");
451 nlwarning ("Can't open stat file name '%s'", fn
.c_str());
457 //nlassert (!Address.hostName().empty())
458 fprintf (fp
, "HostAddress: %s\n", Address
.asString().c_str());
462 fprintf (fp
, "%s\n", line
.c_str());
466 nlinfo (line
.c_str());
468 CMessage
msgout("INFO");
470 CallbackServer
->send (msgout
, From
);
472 NbPing
= NbPong
= MeanPongTime
= NbDuplicated
= 0;
477 static sint64 lastUpdate
= CTime::getLocalTime ();
479 if (CTime::getLocalTime() - lastUpdate
< 2*1000)
482 lastUpdate
= CTime::getLocalTime();
484 // update stat only at the linked UDP-TCP connection
485 for (TClientMap::iterator it
= ClientMap
.begin (); it
!= ClientMap
.end(); it
++)
487 GETCLIENTA(it
)->updateStat ();
497 void removeClientByAddr( TClientMap::iterator iclient
)
499 if ( iclient
== ClientMap
.end() )
501 // It may have already been removed on purpose
505 for (list
<CClient
>::iterator it
= Clients
.begin(); it
!= Clients
.end(); it
++)
507 if ((*it
).Address
== (*iclient
).first
)
509 (*it
).updateFullStat();
510 nlinfo( "Removing client %s", GETCLIENTA(iclient
)->Address
.asString().c_str() );
515 ClientMap
.erase( iclient
);
518 void handleReceivedPong (CClient
*client
, sint64 pongTime
)
521 nlassert( CurrentInMsg
&& (! CurrentInMsg
->data().empty()) );
523 // Prepare message to read
524 CMemStream
msgin( true );
525 uint32 currentsize
= CurrentInMsg
->userSize();
527 memcpy (msgin
.bufferToFill (currentsize
), CurrentInMsg
->userDataR(), currentsize
);
535 // init the UDP connection
539 msgin
.serial (session
);
541 // Find a new udp connection, find the linked
542 list
<CClient
>::iterator it
;
543 for (it
= Clients
.begin(); it
!= Clients
.end(); it
++)
545 if ((*it
).Session
== session
)
548 // Found it, add in the map
549 client
->Address
= CurrentInMsg
->AddrFrom
;
550 ClientMap
.insert (make_pair (client
->Address
, client
));
551 nlinfo ("TCP-UDP linked TCP is %s, UDP is %s", client
->From
->asString().c_str(), client
->Address
.asString().c_str());
553 // Send a TCP message to the client to say that we can start
554 CMessage
msgout ("START");
555 CallbackServer
->send (msgout
, client
->From
);
559 if (it
== Clients
.end())
561 nlwarning ("Unknown TCP client, discard the UDP message (hacker?)");
571 nlwarning ("Received a UDP packet from an old client (hacker?)");
577 msgin
.serial(pingTime
);
579 uint32 pongNumber
= 0;
580 msgin
.serial(pongNumber
);
582 uint32 blockNumber
= 0;
583 msgin
.serial(blockNumber
);
585 // nlinfo ("receive a pong from %s pongnb %d %" NL_I64 "d", CurrentInMsg->AddrFrom.asString().c_str(), pongNumber, pongTime - pingTime);
587 client
->updatePong (pingTime
, pongTime
, pongNumber
, blockNumber
);
594 for (TClientMap::iterator it
= ClientMap
.begin (); it
!= ClientMap
.end(); it
++)
598 sint64 t
= CTime::getLocalTime ();
601 uint32 p
= GETCLIENTA(it
)->NextPingNumber
;
604 uint32 b
= GETCLIENTA(it
)->BlockNumber
;
608 while (msgout
.length() < 200)
609 msgout
.serial (dummy
);
611 uint32 size
= msgout
.length();
612 nlassert (size
== 200);
616 // send the new ping to the client
617 ReceiveTask
->DataSock
->sendTo (msgout
.buffer(), size
, GETCLIENTA(it
)->Address
);
619 catch (const Exception
&e
)
621 nlwarning ("Can't send UDP packet to '%s' (%s)", GETCLIENTA(it
)->Address
.asString().c_str(), e
.what());
624 GETCLIENTA(it
)->NextPingNumber
++;
625 GETCLIENTA(it
)->NbPing
++;
633 class CBenchService
: public IService
639 nlassert( ReceiveTask
==NULL
&& ReceiveThread
==NULL
);
641 // Create stat folder if necessary
643 if (!CFile::isExists (StatPathName
))
645 mkdir (StatPathName
.c_str());
648 // Create and start UDP server
650 nlinfo( "Starting external UDP socket on port %d", UDPPort
);
651 ReceiveTask
= new CReceiveTask (UDPPort
, MaxUDPPacketSize
);
652 CurrentReadQueue
= &Queue2
;
653 ReceiveTask
->setWriteQueue( &Queue1
);
654 nlassert( ReceiveTask
!= NULL
);
655 ReceiveThread
= IThread::create( ReceiveTask
);
656 nlassert( ReceiveThread
!= NULL
);
657 ReceiveThread
->start();
659 // Setup current message placeholder
660 CurrentInMsg
= new TReceivedMessage();
662 // Create the TCP server
664 nlinfo( "Starting external TCP socket on port %d", TCPPort
);
665 CallbackServer
= new CCallbackServer
;
666 CallbackServer
->addCallbackArray (CallbackArray
, sizeof(CallbackArray
)/sizeof(CallbackArray
[0]));
667 CallbackServer
->init (TCPPort
);
668 CallbackServer
->setDisconnectionCallback (cbDisconnect
, NULL
);
675 // Send ping to every client
678 // Update and manage TCP connections
679 CallbackServer
->update ();
683 if ( CurrentReadQueue
== &Queue1
)
685 CurrentReadQueue
= &Queue2
;
686 ReceiveTask
->setWriteQueue( &Queue1
);
690 CurrentReadQueue
= &Queue1
;
691 ReceiveTask
->setWriteQueue( &Queue2
);
694 // Update and manage UDP connections
695 while ( ! CurrentReadQueue
->empty() )
700 CurrentReadQueue
->front( CurrentInMsg
->data() );
701 CurrentReadQueue
->pop();
702 nlassert( ! CurrentReadQueue
->empty() );
703 CurrentReadQueue
->front( CurrentInMsg
->VAddrFrom
);
704 CurrentReadQueue
->pop();
705 CurrentInMsg
->vectorToAddress();
706 pongTime
= CurrentInMsg
->getDate ();
708 // Handle the UDP message
710 // Retrieve client info or add one
711 TClientMap::iterator ihm
= ClientMap
.find( CurrentInMsg
->AddrFrom
);
712 if ( ihm
== ClientMap
.end() )
714 if ( CurrentInMsg
->eventType() == TReceivedMessage::User
)
716 // Handle message for a new client
717 handleReceivedPong( NULL
, pongTime
);
721 nlinfo( "Not removing already removed client" );
727 if ( CurrentInMsg
->eventType() == TReceivedMessage::RemoveClient
)
730 removeClientByAddr( ihm
);
735 handleReceivedPong( GETCLIENTA(ihm
), pongTime
);
742 catch (const Exception
&e
)
744 nlerrornoex ("Exception not catched: '%s'", e
.what());
751 nlassert( ReceiveTask
!= NULL
);
752 nlassert( ReceiveThread
!= NULL
);
754 ReceiveTask
->requireExit();
755 ReceiveTask
->DataSock
->close();
756 ReceiveThread
->wait();
758 if (ReceiveThread
!= NULL
)
760 delete ReceiveThread
;
761 ReceiveThread
= NULL
;
764 if (ReceiveTask
!= NULL
)
770 if (CurrentInMsg
!= NULL
)
776 if (CallbackServer
!= NULL
)
778 delete CallbackServer
;
779 CallbackServer
= NULL
;
786 NLNET_SERVICE_MAIN (CBenchService
, "BS", "bench_service", 45459, EmptyCallbackArray
, UDP_DIR
, "")