1 // NeLNS - MMORPG Framework <http://dev.ryzom.com/projects/nel/>
2 // Copyright (C) 2010 Winch Gate Property Limited
4 // This source file has been modified by the following contributors:
5 // Copyright (C) 2019 Jan BOON (Kaetemi) <jan.boon@kaetemi.be>
7 // This program is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Affero General Public License as
9 // published by the Free Software Foundation, either version 3 of the
10 // License, or (at your option) any later version.
12 // This program is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU Affero General Public License for more details.
17 // You should have received a copy of the GNU Affero General Public License
18 // along with this program. If not, see <http://www.gnu.org/licenses/>.
22 #endif // HAVE_CONFIG_H
25 #define NELNS_CONFIG ""
26 #endif // NELNS_CONFIG
32 #include "nel/misc/types_nl.h"
40 typedef unsigned long ulong
;
45 #include <mysql_version.h>
48 #include "nel/misc/debug.h"
49 #include "nel/misc/config_file.h"
50 #include "nel/misc/path.h"
51 #include "nel/misc/command.h"
53 #include "nel/net/service.h"
54 #include "nel/net/varpath.h"
55 #include "nel/net/email.h"
57 #include "connection_web.h"
65 using namespace NLMISC
;
66 using namespace NLNET
;
70 // NeL Variables (for config file, etc)
73 // this variable should be used in conjunction with UseExplicitAESRegistration.
74 // the AS / AES registration process works as follows:
75 // - aes creates a layer 5 connection to as
76 // - as gets a serviceUp callback and looks in the database to try to find a match for the AES
77 // - if the match fails then AS sends a reject message to the AES
78 // - when the AES receives the reject message they check their UseExplicitAESRegistration flag - if it's set they
79 // attempt an explicit connection, sending the info required by the AS that would normally come from the database
80 // - when the AS receives an explicit registration, it verifies the state of the AllowExplicitAESRegistration flag
81 // and completes the registration work that failed earlier due to the database access failure
82 CVariable
<bool> AllowExplicitAESRegistration("as","AllowExplicitAESRegistration","flag to allow AES services to register explicitly",false,0,true);
84 // this variable allows one to launch an AS on a machine that doesn't have a database setup
85 // the functionality of the AS is reduced (particularly in respect to alarms and graphs which are configured via the database)
86 CVariable
<bool> DontUseDataBase("as","DontUseDataBase","if this flag is set calls to the database will be ignored",false,0,true);
95 CRequest (uint32 id
, TSockId from
) : Id(id
), NbWaiting(0), NbReceived(0), From(from
), NbRow(0), NbLines(1)
97 Time
= CTime::getSecondsSince1970 ();
104 uint32 Time
; // when the request was ask
109 vector
<vector
<string
> > Array
; // it's the 2 dimensional array that will be send to the php for variables
110 vector
<string
> Log
; // this log contains the answer if a command was asked, othewise, Array contains the results
112 uint32
getVariable(const string
&variable
)
114 for (uint32 i
= 0; i
< NbRow
; i
++)
115 if (Array
[i
][0] == variable
)
118 // need to add the variable
119 vector
<string
> NewRow
;
120 NewRow
.resize (NbLines
);
121 NewRow
[0] = variable
;
122 Array
.push_back (NewRow
);
128 for (uint32 i
= 0; i
< NbRow
; i
++)
129 Array
[i
].push_back("");
138 nlinfo ("Display answer array for request %d: %d row %d lines", Id
, NbRow
, NbLines
);
139 for (uint i
= 0; i
< NbLines
; i
++)
141 for (uint j
= 0; j
< NbRow
; j
++)
143 nlassert (Array
.size () == NbRow
);
144 InfoLog
->displayRaw ("%-20s", Array
[j
][i
].c_str());
146 InfoLog
->displayRawNL ("");
148 InfoLog
->displayRawNL ("End of the array");
152 nlinfo ("Display the log for request %d: %d lines", Id
, Log
.size());
153 for (uint i
= 0; i
< Log
.size(); i
++)
155 InfoLog
->displayRaw ("%s", Log
[i
].c_str());
157 InfoLog
->displayRawNL ("End of the log");
162 struct CAdminExecutorService
164 CAdminExecutorService (const string
&shard
, const string
&name
, TServiceId sid
) : Shard(shard
), SId(sid
), Name(name
) { }
166 string Shard
; /// Name of the shard
167 TServiceId SId
; /// uniq number to identify the AES
168 string Name
; /// name of the admin executor service
170 vector
<uint32
> WaitingRequestId
; /// contains all request that the server hasn't reply yet
174 typedef list
<CAdminExecutorService
> TAdminExecutorServices
;
175 typedef list
<CAdminExecutorService
>::iterator AESIT
;
182 TAdminExecutorServices AdminExecutorServices
;
184 MYSQL
*DatabaseConnection
= NULL
;
186 vector
<CRequest
> Requests
;
188 uint32 RequestTimeout
= 5; // in second
190 // cumulate 5 seconds of alert
191 sint32 AdminAlertAccumlationTime
= 5;
198 AESIT
findAES (TServiceId sid
, bool asrt
= true)
201 for (aesit
= AdminExecutorServices
.begin(); aesit
!= AdminExecutorServices
.end(); aesit
++)
202 if ((*aesit
).SId
== sid
)
206 nlassert (aesit
!= AdminExecutorServices
.end());
210 AESIT
findAES (const string
&name
, bool asrt
= true)
213 for (aesit
= AdminExecutorServices
.begin(); aesit
!= AdminExecutorServices
.end(); aesit
++)
214 if ((*aesit
).Name
== name
)
218 nlassert (aesit
!= AdminExecutorServices
.end());
228 MYSQL_RES
*sqlCurrentQueryResult
= NULL
;
230 MYSQL_ROW
sqlQuery (const char *format
, ...)
236 NLMISC_CONVERT_VARGS (query
, format
, 1024);
238 if (DatabaseConnection
== 0)
240 nlwarning ("MYSQL: mysql_query (%s) failed: DatabaseConnection is 0", query
);
244 int ret
= mysql_query (DatabaseConnection
, query
);
247 nlwarning ("MYSQL: mysql_query () failed for query '%s': %s", query
, mysql_error(DatabaseConnection
));
251 sqlCurrentQueryResult
= mysql_store_result(DatabaseConnection
);
252 if (sqlCurrentQueryResult
== 0)
254 nlwarning ("MYSQL: mysql_store_result () failed for query '%s': %s", query
, mysql_error(DatabaseConnection
));
258 MYSQL_ROW row
= mysql_fetch_row(sqlCurrentQueryResult
);
261 nlwarning ("MYSQL: mysql_fetch_row () failed for query '%s': %s", query
, mysql_error(DatabaseConnection
));
264 nldebug ("MYSQL: sqlQuery(%s) returns %d rows", query
, mysql_num_rows(sqlCurrentQueryResult
));
269 MYSQL_ROW
sqlNextRow ()
274 if (sqlCurrentQueryResult
== 0)
277 return mysql_fetch_row(sqlCurrentQueryResult
);
280 void sqlFlushResult()
285 if (sqlCurrentQueryResult
== NULL
)
288 mysql_free_result(sqlCurrentQueryResult
);
289 sqlCurrentQueryResult
= NULL
;
298 uint32 FirstEmailTime
= 0;
300 void sendAdminAlert (const char *format
, ...)
303 NLMISC_CONVERT_VARGS (text
, format
, 4096);
305 if (AdminAlertAccumlationTime
== -1)
307 // we don't send email so just display a warning
308 nlwarning ("ALERT: %s", text
);
312 if(Email
.empty() && FirstEmailTime
== 0)
315 FirstEmailTime
= CTime::getSecondsSince1970();
322 nldebug ("ALERT: pushing email into queue: %s", text
);
326 void updateSendAdminAlert ()
328 if(!Email
.empty() && FirstEmailTime
!= 0 && AdminAlertAccumlationTime
>=0 && CTime::getSecondsSince1970() > FirstEmailTime
+ AdminAlertAccumlationTime
)
330 vector
<string
> lines
;
331 explode (Email
, string("\n"), lines
, true);
336 if (IService::getInstance()->ConfigFile
.exists("SysLogPath") && IService::getInstance()->ConfigFile
.exists("SysLogParams"))
340 if (lines
.size() > 1)
342 param
= "Multiple problems, first is: ";
345 string res
= toString(IService::getInstance()->ConfigFile
.getVar("SysLogParams").asString().c_str(), param
.c_str());
346 launchProgram(IService::getInstance()->ConfigFile
.getVar("SysLogPath").asString(), res
);
349 if (IService::getInstance()->ConfigFile
.exists("AdminEmail"))
353 if (lines
.size() == 1)
359 subject
= "Multiple problems";
363 if(IService::getInstance()->ConfigFile
.exists("AdminEmailFrom"))
364 from
= IService::getInstance()->ConfigFile
.getVar("AdminEmailFrom").asString();
365 CConfigFile::CVar
&var
= IService::getInstance()->ConfigFile
.getVar("AdminEmail");
366 for (uint i
= 0; i
< var
.size(); i
++)
368 if (!sendEmail ("", from
, var
.asString(i
), subject
, Email
))
370 nlwarning ("Can't send email to '%s'", var
.asString(i
).c_str());
374 nlinfo ("ALERT: Sent email to admin %s the subject: %s", var
.asString(i
).c_str(), subject
.c_str());
386 static void cbAdminEmail (CMessage
&msgin
, const std::string
&serviceName
, TServiceId sid
)
390 sendAdminAlert (str
.c_str());
393 static void cbGraphUpdate (CMessage
&msgin
, const std::string
&serviceName
, TServiceId sid
)
396 msgin
.serial (CurrentTime
);
398 while (msgin
.getPos() < (sint32
)msgin
.length())
402 msgin
.serial (service
, var
, val
);
404 AESIT aesit
= findAES (sid
);
406 string shard
, server
;
407 shard
= (*aesit
).Shard
;
408 server
= (*aesit
).Name
;
410 if (!shard
.empty() && !server
.empty() && !service
.empty() && !var
.empty())
412 string path
= CPath::standardizePath (IService::getInstance()->ConfigFile
.getVar("RRDVarPath").asString());
413 string rrdfilename
= path
+ shard
+"."+server
+"."+service
+"."+var
+".rrd";
417 if (!NLMISC::CFile::fileExists(rrdfilename
))
419 MYSQL_ROW row
= sqlQuery ("select graph_update from variable where path like '%%%s' and graph_update!=0", var
.c_str());
422 uint32 freq
= atoi(row
[0]);
423 arg
= "create "+rrdfilename
+" --step "+toString(freq
)+" DS:var:GAUGE:"+toString(freq
*2)+":U:U RRA:AVERAGE:0.5:1:1000 RRA:AVERAGE:0.5:10:1000 RRA:AVERAGE:0.5:100:1000";
424 launchProgram(IService::getInstance()->ConfigFile
.getVar("RRDToolPath").asString(), arg
);
428 nlwarning ("Can't create the rrd because no graph_update in database");
433 arg
= "update " + rrdfilename
+ " " + toString (CurrentTime
) + ":" + toString(val
);
434 launchProgram(IService::getInstance()->ConfigFile
.getVar("RRDToolPath").asString(), arg
);
438 nlwarning ("Shard server service var val is empty");
448 uint32
newRequest (TSockId from
)
450 static uint32 NextId
= 5461231;
452 Requests
.push_back (CRequest(NextId
, from
));
457 void addRequestWaitingNb (uint32 rid
)
459 for (uint i
= 0 ; i
< Requests
.size (); i
++)
461 if (Requests
[i
].Id
== rid
)
463 Requests
[i
].NbWaiting
++;
464 Requests
[i
].Time
= CTime::getSecondsSince1970 ();
468 nlwarning ("REQUEST: Received an answer from an unknown resquest %d (perhaps due to a AS timeout)", rid
);
471 void subRequestWaitingNb (uint32 rid
)
473 for (uint i
= 0 ; i
< Requests
.size (); i
++)
475 if (Requests
[i
].Id
== rid
)
477 Requests
[i
].NbWaiting
--;
481 nlwarning ("REQUEST: Received an answer from an unknown resquest %d (perhaps due to a AS timeout)", rid
);
484 void addRequestReceived (uint32 rid
)
486 for (uint i
= 0 ; i
< Requests
.size (); i
++)
488 if (Requests
[i
].Id
== rid
)
490 Requests
[i
].NbReceived
++;
494 nlwarning ("REQUEST: Received an answer from an unknown resquest %d (perhaps due to a AS timeout)", rid
);
497 void addRequestAnswer (uint32 rid
, const vector
<string
> &variables
, const vector
<string
> &values
)
499 for (uint i
= 0 ; i
< Requests
.size (); i
++)
501 Requests
[i
].addLine ();
502 if (Requests
[i
].Id
== rid
)
504 if (!variables
.empty() && variables
[0]=="__log")
506 nlassert (variables
.size() == 1);
508 for (uint j
= 0; j
< values
.size(); j
++)
510 Requests
[i
].Log
.push_back (values
[j
]);
515 nlassert (variables
.size() == values
.size ());
516 for (uint j
= 0; j
< variables
.size(); j
++)
518 uint32 pos
= Requests
[i
].getVariable (variables
[j
]);
519 Requests
[i
].Array
[pos
][Requests
[i
].NbLines
-1] = values
[j
];
525 nlwarning ("REQUEST: Received an answer from an unknown resquest %d (perhaps due to a AS timeout)", rid
);
528 bool emptyRequest (uint32 rid
)
530 for (uint i
= 0 ; i
< Requests
.size (); i
++)
532 if (Requests
[i
].Id
== rid
&& Requests
[i
].NbWaiting
!= 0)
542 uint32 currentTime
= CTime::getSecondsSince1970 ();
546 for (uint i
= 0 ; i
< Requests
.size ();)
548 // the AES doesn't answer quickly
549 timeout
= (currentTime
>= Requests
[i
].Time
+RequestTimeout
);
551 if (Requests
[i
].NbWaiting
<= Requests
[i
].NbReceived
|| timeout
)
553 // the request is over, send to the php
559 nlwarning ("REQUEST: Request %d timeouted, only %d on %d services have replied", Requests
[i
].Id
, Requests
[i
].NbReceived
, Requests
[i
].NbWaiting
);
562 if (Requests
[i
].Log
.empty())
564 if (Requests
[i
].NbRow
== 0 && timeout
)
566 str
= "1 ((TIMEOUT))";
570 str
= toString(Requests
[i
].NbRow
) + " ";
571 for (uint k
= 0; k
< Requests
[i
].NbLines
; k
++)
573 for (uint j
= 0; j
< Requests
[i
].NbRow
; j
++)
575 nlassert (Requests
[i
].Array
.size () == Requests
[i
].NbRow
);
576 if (Requests
[i
].Array
[j
][k
].empty ())
580 str
+= Requests
[i
].Array
[j
][k
];
582 str
+= "((TIMEOUT))";
591 for (uint k
= 0; k
< Requests
[i
].Log
.size(); k
++)
593 str
+= Requests
[i
].Log
[k
];
595 str
+= "((TIMEOUT))";
599 sendString (Requests
[i
].From
, str
);
601 // set to 0 to erase it
602 Requests
[i
].NbWaiting
= 0;
605 if (Requests
[i
].NbWaiting
== 0)
607 Requests
.erase (Requests
.begin ()+i
);
626 MYSQL
*db
= mysql_init(NULL
);
629 nlerror ("mysql_init() failed");
632 #if LIBMYSQL_VERSION_ID < 80000
637 if (mysql_options (db
, MYSQL_OPT_RECONNECT
, &opt
))
640 DatabaseConnection
= 0;
641 nlerror("mysql_options() failed for database connection to '%s'", IService::getInstance()->ConfigFile
.getVar("DatabaseHost").asString().c_str());
645 DatabaseConnection
= mysql_real_connect(db
,
646 IService::getInstance()->ConfigFile
.getVar("DatabaseHost").asString().c_str(),
647 IService::getInstance()->ConfigFile
.getVar("DatabaseLogin").asString().c_str(),
648 IService::getInstance()->ConfigFile
.getVar("DatabasePassword").asString().c_str(),
649 IService::getInstance()->ConfigFile
.getVar("DatabaseName").asString().c_str(),
651 if (DatabaseConnection
== NULL
|| DatabaseConnection
!= db
)
653 nlerror ("mysql_real_connect() failed to '%s' with login '%s' and database name '%s' with %s",
654 IService::getInstance()->ConfigFile
.getVar("DatabaseHost").asString().c_str(),
655 IService::getInstance()->ConfigFile
.getVar("DatabaseLogin").asString().c_str(),
656 IService::getInstance()->ConfigFile
.getVar("DatabaseName").asString().c_str(),
657 (IService::getInstance()->ConfigFile
.getVar("DatabasePassword").asString().empty()?"empty password":"password")
661 #if MYSQL_VERSION_ID < 50019
663 if (mysql_options (DatabaseConnection
, MYSQL_OPT_RECONNECT
, &opt
))
666 DatabaseConnection
= 0;
667 nlerror("mysql_options() failed for database connection to '%s'", IService::getInstance()->ConfigFile
.getVar("DatabaseHost").asString().c_str());
674 ////////////////////////////////////////////////////////////////////////////////////////////////////////
675 ////////////////////////////////////////////////////////////////////////////////////////////////////////
676 ////////////////// CONNECTION TO THE AES ///////////////////////////////////////////////////////////////
677 ////////////////////////////////////////////////////////////////////////////////////////////////////////
678 ////////////////////////////////////////////////////////////////////////////////////////////////////////
680 void sendAESInformation (TServiceId sid
)
682 AESIT aesit
= findAES (sid
);
684 vector
<string
> information
;
686 CMessage
msgout("AES_INFO");
689 // send services that should be running on this AES
691 information
.clear ();
692 MYSQL_ROW row
= sqlQuery ("select name from service where server='%s'", (*aesit
).Name
.c_str());
695 string service
= row
[0];
696 nlinfo ("Adding '%s' in registered services to AES-%hu", row
[0], sid
.get());
697 information
.push_back (service
);
701 msgout
.serialCont (information
);
704 // send variable alarms for services that should running on this AES
706 information
.clear ();
707 row
= sqlQuery ("select path, error_bound, alarm_order from variable where error_bound!=-1");
710 nlinfo ("Adding '%s' '%s' '%s' in alarm to AES-%hu", row
[0], row
[1], row
[2], sid
.get());
711 information
.push_back (row
[0]);
712 information
.push_back (row
[1]);
713 information
.push_back (row
[2]);
717 msgout
.serialCont (information
);
720 // send graph update for services that should running on this AES
722 information
.clear ();
723 row
= sqlQuery ("select path, graph_update from variable where graph_update!=0");
726 CVarPath
varpath (row
[0]);
728 for(uint i
= 0; i
< varpath
.Destination
.size(); i
++)
730 string a
= varpath
.Destination
[i
].first
, b
= (*aesit
).Shard
;
731 if(varpath
.Destination
[i
].first
== "*" || varpath
.Destination
[i
].first
== (*aesit
).Shard
)
733 CVarPath
varpath2 (varpath
.Destination
[i
].second
);
735 for(uint j
= 0; j
< varpath2
.Destination
.size(); j
++)
737 string c
= varpath2
.Destination
[j
].first
, d
= (*aesit
).Name
;
738 if(varpath2
.Destination
[j
].first
== "*" || varpath2
.Destination
[j
].first
== (*aesit
).Name
)
740 nlinfo ("Adding '%s' '%s' in graph to AES-%hu", row
[0], row
[1], sid
.get());
741 information
.push_back (row
[0]);
742 information
.push_back (row
[1]);
750 msgout
.serialCont (information
);
752 nlinfo ("Sending all information about %s AES-%hu (hostedservices, alarms,grapupdate)", (*aesit
).Name
.c_str(), (*aesit
).SId
.get());
753 CUnifiedNetwork::getInstance ()->send (sid
, msgout
);
756 void rejectAES(TServiceId sid
, const string
&res
)
758 CMessage
msgout("REJECTED");
759 msgout
.serial ((string
&)res
);
760 CUnifiedNetwork::getInstance ()->send (sid
, msgout
);
763 // i'm connected to a new admin executor service
764 static void cbNewAESConnection (const std::string
&serviceName
, TServiceId sid
, void *arg
)
767 CCallbackNetBase
*cnb
= CUnifiedNetwork::getInstance ()->getNetBase (sid
, from
);
768 const CInetAddress
&ia
= cnb
->hostAddress (from
);
770 AESIT aesit
= findAES (sid
, false);
772 if (aesit
!= AdminExecutorServices
.end ())
774 nlwarning ("Connection of an AES that is already in the list (%s)", ia
.asString ().c_str ());
775 rejectAES (sid
, "This AES is already in the AS list");
779 MYSQL_ROW row
= sqlQuery ("select name from server where address='%s'", ia
.ipAddress().c_str());
782 if (!AllowExplicitAESRegistration
)
784 nlwarning ("Connection of an AES that is not in database server list (%s)", ia
.asString ().c_str ());
788 nlinfo ("Rejecting auto-connection of an AES (%s) - this should provke explicitly reconnect", ia
.asString ().c_str ());
790 rejectAES (sid
, "This AES is not registered in the database");
794 string server
= row
[0];
797 row
= sqlQuery ("select shard from service where server='%s'", server
.c_str());
800 nlwarning ("Connection of an AES that is not in database server list (%s)", ia
.asString ().c_str ());
801 rejectAES (sid
, "This AES is not registered in the database");
805 string shard
= row
[0];
808 AdminExecutorServices
.push_back (CAdminExecutorService(shard
, server
, sid
));
810 nlinfo ("%s-%hu, server name %s, for shard %s connected and added in the list", serviceName
.c_str(), sid
.get(), server
.c_str(), shard
.c_str());
812 // send him services that should run on this server
813 sendAESInformation (sid
);
816 // i'm disconnected from an admin executor service
817 static void cbNewAESDisconnection (const std::string
&serviceName
, TServiceId sid
, void *arg
)
820 CCallbackNetBase
*cnb
= CUnifiedNetwork::getInstance ()->getNetBase (sid
, from
);
821 const CInetAddress
&ia
= cnb
->hostAddress (from
);
823 AESIT aesit
= findAES (sid
, false);
825 if (aesit
== AdminExecutorServices
.end ())
827 nlwarning ("Disconnection of %s-%hu that is not in my list (%s)", serviceName
.c_str (), sid
.get(), ia
.asString ().c_str ());
831 nlinfo ("%s-%hu, shard name %s, disconnected and removed from the list", serviceName
.c_str(), sid
.get(), (*aesit
).Name
.c_str ());
833 // we need to remove pending request
835 for(uint i
= 0; i
< (*aesit
).WaitingRequestId
.size (); i
++)
837 subRequestWaitingNb ((*aesit
).WaitingRequestId
[i
]);
840 AdminExecutorServices
.erase (aesit
);
843 // we receive an explicit registration message from an AES
844 void cbRegisterAES(CMessage
&msgin
, const std::string
&serviceName
, TServiceId sid
)
846 if (!AllowExplicitAESRegistration
)
848 nlwarning("Ignoring attempted AES registration because AllowExplicitAESRegistration==false");
856 msgin
.serial(server
);
861 nlwarning("Ignoring attempted AES registration due to execption during message decoding");
865 AdminExecutorServices
.push_back (CAdminExecutorService(shard
, server
, sid
));
867 nlinfo ("%s-%hu, server name %s, for shard %s connected and added in the list", serviceName
.c_str(), sid
.get(), server
.c_str(), shard
.c_str());
869 // send him services that should run on this server
870 sendAESInformation (sid
);
873 static void cbView (CMessage
&msgin
, const std::string
&serviceName
, TServiceId sid
)
878 AESIT aesit
= findAES (sid
);
880 for (uint i
= 0; i
< (*aesit
).WaitingRequestId
.size();)
882 if ((*aesit
).WaitingRequestId
[i
] == rid
)
884 (*aesit
).WaitingRequestId
.erase ((*aesit
).WaitingRequestId
.begin ()+i
);
892 MYSQL_ROW row
= sqlQuery ("select distinct shard from service where server='%s'", (*aesit
).Name
.c_str ());
894 // shard name is find using the "service" table, so, if there s no shard name in it, it returns ???
896 if (row
!= NULL
) shardName
= row
[0];
897 else shardName
= DontUseDataBase
? aesit
->Shard
: "???";
899 vector
<string
> vara
, vala
;
901 while ((uint32
)msgin
.getPos() < msgin
.length())
906 // adding default row
907 vara
.push_back ("shard");
908 vara
.push_back ("server");
910 vala
.push_back (shardName
);
911 vala
.push_back ((*aesit
).Name
);
917 for (i
= 0; i
< nb
; i
++)
925 vara
.push_back (var
);
928 if (vara
.size() > 0 && vara
[0] == "__log")
929 vala
.push_back ("----- Result from Shard "+shardName
+" Server "+(*aesit
).Name
+"\n");
932 for (i
= 0; i
< nb
; i
++)
935 vala
.push_back (val
);
937 addRequestAnswer (rid
, vara
, vala
);
941 // inc the NbReceived counter
942 addRequestReceived (rid
);
945 TUnifiedCallbackItem CallbackArray
[] =
947 { "REGISTER_AES", cbRegisterAES
},
949 { "ADMIN_EMAIL", cbAdminEmail
},
950 { "GRAPH_UPDATE", cbGraphUpdate
},
954 ////////////////////////////////////////////////////////////////////////////////////////////////////////
955 ////////////////////////////////////////////////////////////////////////////////////////////////////////
956 ////////////////// CONNECTION TO THE CLIENT ////////////////////////////////////////////////////////////
957 ////////////////////////////////////////////////////////////////////////////////////////////////////////
958 ////////////////////////////////////////////////////////////////////////////////////////////////////////
960 void addRequest (const string
&rawvarpath
, TSockId from
)
962 nlinfo ("addRequest from %s: '%s'", from
->asString ().c_str (), rawvarpath
.c_str ());
964 if(rawvarpath
.empty ())
966 // send an empty string to say to php that there's nothing
968 sendString (from
, str
);
975 if(rawvarpath
== "reload")
977 // it means the we have to resend the list of services managed by AES from the mysql tables
978 for (AESIT aesit
= AdminExecutorServices
.begin(); aesit
!= AdminExecutorServices
.end(); aesit
++)
980 sendAESInformation ((*aesit
).SId
);
983 // send an empty string to say to php that there's nothing
985 sendString (from
, str
);
993 CVarPath
varpath (rawvarpath
);
995 uint32 rid
= newRequest (from
);
997 for (uint i
= 0; i
< varpath
.Destination
.size (); i
++)
999 string shard
= varpath
.Destination
[i
].first
;
1001 CVarPath
subvarpath (varpath
.Destination
[i
].second
);
1003 for (uint j
= 0; j
< subvarpath
.Destination
.size (); j
++)
1005 string server
= subvarpath
.Destination
[j
].first
;
1007 if (shard
== "*" && server
== "*")
1009 // Send the request to all online servers of all online shards
1012 for (aesit
= AdminExecutorServices
.begin(); aesit
!= AdminExecutorServices
.end(); aesit
++)
1014 addRequestWaitingNb (rid
);
1015 (*aesit
).WaitingRequestId
.push_back (rid
);
1017 CMessage
msgout("AES_GET_VIEW");
1018 msgout
.serial (rid
);
1019 msgout
.serial (subvarpath
.Destination
[j
].second
);
1020 CUnifiedNetwork::getInstance ()->send ((*aesit
).SId
, msgout
);
1021 nlinfo ("REQUEST: Sent view '%s' to shard name %s 'AES-%hu'", subvarpath
.Destination
[j
].second
.c_str(), (*aesit
).Name
.c_str(), (*aesit
).SId
.get());
1024 else if (shard
== "*" && server
== "#")
1026 // Select all shard all server including offline one
1028 MYSQL_ROW row
= sqlQuery ("select distinct server, shard from service");
1032 AESIT aesit
= findAES (row
[0], false);
1034 if (aesit
!= AdminExecutorServices
.end())
1036 addRequestWaitingNb (rid
);
1037 (*aesit
).WaitingRequestId
.push_back (rid
);
1039 CMessage
msgout("AES_GET_VIEW");
1040 msgout
.serial (rid
);
1041 msgout
.serial (subvarpath
.Destination
[j
].second
);
1042 CUnifiedNetwork::getInstance ()->send ((*aesit
).SId
, msgout
);
1043 nlinfo ("REQUEST: Sent view '%s' to shard name %s 'AES-%hu'", subvarpath
.Destination
[j
].second
.c_str(), (*aesit
).Name
.c_str(), (*aesit
).SId
.get());
1046 else if (server
== "#")
1048 vector
<string
> vara
, vala
;
1050 // adding default row
1051 vara
.push_back ("shard");
1052 vala
.push_back (row
[1]);
1054 vara
.push_back ("server");
1055 vala
.push_back (row
[0]);
1057 vara
.push_back ("service");
1058 vala
.push_back ("AES");
1060 vara
.push_back ("State");
1061 vala
.push_back ("Offline");
1063 addRequestAnswer (rid
, vara
, vala
);
1065 row
= sqlNextRow ();
1069 else if (server
== "*" || server
== "#")
1071 // Send the request to all online server of a specific shard
1073 MYSQL_ROW row
= sqlQuery ("select distinct server from service where shard='%s'", shard
.c_str ());
1077 AESIT aesit
= findAES (row
[0], false);
1079 if (aesit
!= AdminExecutorServices
.end())
1081 addRequestWaitingNb (rid
);
1082 (*aesit
).WaitingRequestId
.push_back (rid
);
1084 CMessage
msgout("AES_GET_VIEW");
1085 msgout
.serial (rid
);
1086 msgout
.serial (subvarpath
.Destination
[j
].second
);
1087 CUnifiedNetwork::getInstance ()->send ((*aesit
).SId
, msgout
);
1088 nlinfo ("REQUEST: Sent view '%s' to shard name %s 'AES-%hu'", subvarpath
.Destination
[j
].second
.c_str(), (*aesit
).Name
.c_str(), (*aesit
).SId
.get());
1091 else if (server
== "#")
1093 vector
<string
> vara
, vala
;
1095 // adding default row
1096 vara
.push_back ("shard");
1097 vala
.push_back (shard
);
1099 vara
.push_back ("server");
1100 vala
.push_back (row
[0]);
1102 vara
.push_back ("service");
1103 vala
.push_back ("AES");
1105 vara
.push_back ("State");
1106 vala
.push_back ("Offline");
1108 addRequestAnswer (rid
, vara
, vala
);
1110 row
= sqlNextRow ();
1117 AESIT aesit
= findAES (server
, false);
1119 if (aesit
!= AdminExecutorServices
.end())
1121 addRequestWaitingNb (rid
);
1122 (*aesit
).WaitingRequestId
.push_back (rid
);
1124 CMessage
msgout("AES_GET_VIEW");
1125 msgout
.serial (rid
);
1126 msgout
.serial (subvarpath
.Destination
[j
].second
);
1127 CUnifiedNetwork::getInstance ()->send ((*aesit
).SId
, msgout
);
1128 nlinfo ("REQUEST: Sent view '%s' to shard name %s 'AES-%hu'", subvarpath
.Destination
[j
].second
.c_str(), (*aesit
).Name
.c_str(), (*aesit
).SId
.get());
1132 nlwarning ("Server %s is not found in the list", server
.c_str ());
1139 static void varRequestTimeout(CConfigFile::CVar
&var
)
1141 RequestTimeout
= var
.asInt();
1142 nlinfo ("Request timeout is now after %d seconds", RequestTimeout
);
1145 static void varAdminAlertAccumlationTime (CConfigFile::CVar
&var
)
1147 AdminAlertAccumlationTime
= var
.asInt();
1151 ////////////////////////////////////////////////////////////////////////////////////////////////////////
1152 ////////////////////////////////////////////////////////////////////////////////////////////////////////
1153 ////////////////// SERVICE IMPLEMENTATION //////////////////////////////////////////////////////////////
1154 ////////////////////////////////////////////////////////////////////////////////////////////////////////
1155 ////////////////////////////////////////////////////////////////////////////////////////////////////////
1157 class CAdminService
: public IService
1161 /// Init the service, load the universal time.
1164 setDefaultEmailParams (ConfigFile
.getVar ("SMTPServer").asString (), ConfigFile
.getVar("DefaultEmailFrom").asString(), "");
1168 connectionWebInit ();
1170 //CVarPath toto ("[toto");
1172 //CVarPath toto ("*.*.*.*");
1173 //CVarPath toto ("[srv1,srv2].*.*.*");
1174 //CVarPath toto ("[svr1.svc1,srv2.svc2].*.*");
1175 //CVarPath toto ("[svr1.[svc1,svc2].*.var1,srv2.svc2.fe*.var2].toto");
1176 //CVarPath toto ("[svr1.svc1.*.toto,srv2.svc2.*.tata]");
1178 CUnifiedNetwork::getInstance ()->setServiceUpCallback ("AES", cbNewAESConnection
);
1179 CUnifiedNetwork::getInstance ()->setServiceDownCallback ("AES", cbNewAESDisconnection
);
1181 varRequestTimeout (ConfigFile
.getVar ("RequestTimeout"));
1182 ConfigFile
.setCallback("RequestTimeout", &varRequestTimeout
);
1184 varAdminAlertAccumlationTime (ConfigFile
.getVar ("AdminAlertAccumlationTime"));
1185 ConfigFile
.setCallback("AdmimAlertAccumlationTime", &varAdminAlertAccumlationTime
);
1192 connectionWebUpdate ();
1194 updateSendAdminAlert ();
1200 connectionWebRelease ();
1206 NLNET_SERVICE_MAIN (CAdminService
, "AS", "admin_service", 49996, CallbackArray
, NELNS_CONFIG
, NELNS_LOGS
);
1209 NLMISC_COMMAND (getViewAS
, "send a view and receive an array as result", "<varpath>")
1212 for (uint i
= 0; i
< args
.size(); i
++)
1214 if (i
!= 0) cmd
+= " ";
1218 addRequest (cmd
, NULL
);
1223 NLMISC_COMMAND (clearRequests
, "clear all pending requests", "")
1225 if(args
.size() != 0) return false;
1227 // for all request, set the NbWaiting to NbReceived, next cleanRequest() will send answer and clear all request
1228 for (uint i
= 0 ; i
< Requests
.size (); i
++)
1230 if (Requests
[i
].NbWaiting
<= Requests
[i
].NbReceived
)
1232 Requests
[i
].NbWaiting
= Requests
[i
].NbReceived
;
1239 NLMISC_COMMAND (displayRequests
, "display all pending requests", "")
1241 if(args
.size() != 0) return false;
1243 log
.displayNL ("Display %d pending requests", Requests
.size ());
1244 for (uint i
= 0 ; i
< Requests
.size (); i
++)
1246 log
.displayNL ("id: %d wait: %d recv: %d from: %s nbrow: %d", Requests
[i
].Id
, Requests
[i
].NbWaiting
, Requests
[i
].NbReceived
, Requests
[i
].From
->asString ().c_str (), Requests
[i
].NbRow
);
1248 log
.displayNL ("End of display pending requests");
1253 NLMISC_COMMAND (generateAlert
, "generate an alert", "<text>")
1255 if(args
.size() != 1) return false;
1257 sendAdminAlert (args
[0].c_str());