1 // NeLNS - MMORPG Framework <http://dev.ryzom.com/projects/nel/>
2 // Copyright (C) 2010 Winch Gate Property Limited
4 // This source file has been modified by the following contributors:
5 // Copyright (C) 2019 Jan BOON (Kaetemi) <jan.boon@kaetemi.be>
7 // This program is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Affero General Public License as
9 // published by the Free Software Foundation, either version 3 of the
10 // License, or (at your option) any later version.
12 // This program is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU Affero General Public License for more details.
17 // You should have received a copy of the GNU Affero General Public License
18 // along with this program. If not, see <http://www.gnu.org/licenses/>.
22 #endif // HAVE_CONFIG_H
25 #define NELNS_CONFIG ""
26 #endif // NELNS_CONFIG
32 #include "nel/misc/types_nl.h"
40 typedef unsigned long ulong
;
45 #include <mysql_version.h>
48 #include "nel/misc/debug.h"
49 #include "nel/misc/config_file.h"
50 #include "nel/misc/path.h"
51 #include "nel/misc/command.h"
53 #include "nel/net/service.h"
54 #include "nel/net/varpath.h"
55 #include "nel/net/email.h"
57 #include "connection_web.h"
65 using namespace NLMISC
;
66 using namespace NLNET
;
70 // NeL Variables (for config file, etc)
73 // this variable should be used in conjunction with UseExplicitAESRegistration.
74 // the AS / AES registration process works as follows:
75 // - aes creates a layer 5 connection to as
76 // - as gets a serviceUp callback and looks in the database to try to find a match for the AES
77 // - if the match fails then AS sends a reject message to the AES
78 // - when the AES receives the reject message they check their UseExplicitAESRegistration flag - if it's set they
79 // attempt an explicit connection, sending the info required by the AS that would normally come from the database
80 // - when the AS receives an explicit registration, it verifies the state of the AllowExplicitAESRegistration flag
81 // and completes the registration work that failed earlier due to the database access failure
82 CVariable
<bool> AllowExplicitAESRegistration("as","AllowExplicitAESRegistration","flag to allow AES services to register explicitly",false,0,true);
84 // this variable allows one to launch an AS on a machine that doesn't have a database setup
85 // the functionality of the AS is reduced (particularly in respect to alarms and graphs which are configured via the database)
86 CVariable
<bool> DontUseDataBase("as","DontUseDataBase","if this flag is set calls to the database will be ignored",false,0,true);
95 CRequest (uint32 id
, TSockId from
) : Id(id
), NbWaiting(0), NbReceived(0), From(from
), NbRow(0), NbLines(1)
97 Time
= CTime::getSecondsSince1970 ();
104 uint32 Time
; // when the request was ask
109 vector
<vector
<string
> > Array
; // it's the 2 dimensional array that will be send to the php for variables
110 vector
<string
> Log
; // this log contains the answer if a command was asked, othewise, Array contains the results
112 uint32
getVariable(const string
&variable
)
114 for (uint32 i
= 0; i
< NbRow
; i
++)
115 if (Array
[i
][0] == variable
)
118 // need to add the variable
119 vector
<string
> NewRow
;
120 NewRow
.resize (NbLines
);
121 NewRow
[0] = variable
;
122 Array
.push_back (NewRow
);
128 for (uint32 i
= 0; i
< NbRow
; i
++)
129 Array
[i
].push_back("");
138 nlinfo ("Display answer array for request %d: %d row %d lines", Id
, NbRow
, NbLines
);
139 for (uint i
= 0; i
< NbLines
; i
++)
141 for (uint j
= 0; j
< NbRow
; j
++)
143 nlassert (Array
.size () == NbRow
);
144 InfoLog
->displayRaw ("%-20s", Array
[j
][i
].c_str());
146 InfoLog
->displayRawNL ("");
148 InfoLog
->displayRawNL ("End of the array");
152 nlinfo ("Display the log for request %d: %d lines", Id
, Log
.size());
153 for (uint i
= 0; i
< Log
.size(); i
++)
155 InfoLog
->displayRaw ("%s", Log
[i
].c_str());
157 InfoLog
->displayRawNL ("End of the log");
162 struct CAdminExecutorService
164 CAdminExecutorService (const string
&shard
, const string
&name
, TServiceId sid
) : Shard(shard
), SId(sid
), Name(name
) { }
166 string Shard
; /// Name of the shard
167 TServiceId SId
; /// uniq number to identify the AES
168 string Name
; /// name of the admin executor service
170 vector
<uint32
> WaitingRequestId
; /// contains all request that the server hasn't reply yet
174 typedef list
<CAdminExecutorService
> TAdminExecutorServices
;
175 typedef list
<CAdminExecutorService
>::iterator AESIT
;
182 TAdminExecutorServices AdminExecutorServices
;
184 MYSQL
*DatabaseConnection
= NULL
;
186 vector
<CRequest
> Requests
;
188 uint32 RequestTimeout
= 5; // in second
190 // cumulate 5 seconds of alert
191 sint32 AdminAlertAccumlationTime
= 5;
198 AESIT
findAES (TServiceId sid
, bool asrt
= true)
201 for (aesit
= AdminExecutorServices
.begin(); aesit
!= AdminExecutorServices
.end(); aesit
++)
202 if ((*aesit
).SId
== sid
)
206 nlassert (aesit
!= AdminExecutorServices
.end());
210 AESIT
findAES (const string
&name
, bool asrt
= true)
213 for (aesit
= AdminExecutorServices
.begin(); aesit
!= AdminExecutorServices
.end(); aesit
++)
214 if ((*aesit
).Name
== name
)
218 nlassert (aesit
!= AdminExecutorServices
.end());
228 MYSQL_RES
*sqlCurrentQueryResult
= NULL
;
230 MYSQL_ROW
sqlQuery (const char *format
, ...)
236 NLMISC_CONVERT_VARGS (query
, format
, 1024);
238 if (DatabaseConnection
== 0)
240 nlwarning ("MYSQL: mysql_query (%s) failed: DatabaseConnection is 0", query
);
244 int ret
= mysql_query (DatabaseConnection
, query
);
247 nlwarning ("MYSQL: mysql_query () failed for query '%s': %s", query
, mysql_error(DatabaseConnection
));
251 sqlCurrentQueryResult
= mysql_store_result(DatabaseConnection
);
252 if (sqlCurrentQueryResult
== 0)
254 nlwarning ("MYSQL: mysql_store_result () failed for query '%s': %s", query
, mysql_error(DatabaseConnection
));
258 MYSQL_ROW row
= mysql_fetch_row(sqlCurrentQueryResult
);
261 nlwarning ("MYSQL: mysql_fetch_row () failed for query '%s': %s", query
, mysql_error(DatabaseConnection
));
264 nldebug ("MYSQL: sqlQuery(%s) returns %d rows", query
, mysql_num_rows(sqlCurrentQueryResult
));
269 MYSQL_ROW
sqlNextRow ()
274 if (sqlCurrentQueryResult
== 0)
277 return mysql_fetch_row(sqlCurrentQueryResult
);
280 void sqlFlushResult()
285 if (sqlCurrentQueryResult
== NULL
)
288 mysql_free_result(sqlCurrentQueryResult
);
289 sqlCurrentQueryResult
= NULL
;
298 uint32 FirstEmailTime
= 0;
300 void sendAdminAlert (const char *format
, ...)
303 NLMISC_CONVERT_VARGS (text
, format
, 4096);
305 if (AdminAlertAccumlationTime
== -1)
307 // we don't send email so just display a warning
308 nlwarning ("ALERT: %s", text
);
312 if(Email
.empty() && FirstEmailTime
== 0)
315 FirstEmailTime
= CTime::getSecondsSince1970();
322 nldebug ("ALERT: pushing email into queue: %s", text
);
326 void updateSendAdminAlert ()
328 if(!Email
.empty() && FirstEmailTime
!= 0 && AdminAlertAccumlationTime
>=0 && CTime::getSecondsSince1970() > FirstEmailTime
+ AdminAlertAccumlationTime
)
330 vector
<string
> lines
;
331 explode (Email
, string("\n"), lines
, true);
336 if (IService::getInstance()->ConfigFile
.exists("SysLogPath") && IService::getInstance()->ConfigFile
.exists("SysLogParams"))
340 if (lines
.size() > 1)
342 param
= "Multiple problems, first is: ";
345 string res
= toString(IService::getInstance()->ConfigFile
.getVar("SysLogParams").asString().c_str(), param
.c_str());
346 launchProgram(IService::getInstance()->ConfigFile
.getVar("SysLogPath").asString(), res
);
349 if (IService::getInstance()->ConfigFile
.exists("AdminEmail"))
353 if (lines
.size() == 1)
359 subject
= "Multiple problems";
363 if(IService::getInstance()->ConfigFile
.exists("AdminEmailFrom"))
364 from
= IService::getInstance()->ConfigFile
.getVar("AdminEmailFrom").asString();
365 CConfigFile::CVar
&var
= IService::getInstance()->ConfigFile
.getVar("AdminEmail");
366 for (uint i
= 0; i
< var
.size(); i
++)
368 if (!sendEmail ("", from
, var
.asString(i
), subject
, Email
))
370 nlwarning ("Can't send email to '%s'", var
.asString(i
).c_str());
374 nlinfo ("ALERT: Sent email to admin %s the subject: %s", var
.asString(i
).c_str(), subject
.c_str());
386 static void cbAdminEmail (CMessage
&msgin
, const std::string
&serviceName
, TServiceId sid
)
390 sendAdminAlert (str
.c_str());
393 static void cbGraphUpdate (CMessage
&msgin
, const std::string
&serviceName
, TServiceId sid
)
396 msgin
.serial (CurrentTime
);
398 while (msgin
.getPos() < (sint32
)msgin
.length())
402 msgin
.serial (service
, var
, val
);
404 AESIT aesit
= findAES (sid
);
406 string shard
, server
;
407 shard
= (*aesit
).Shard
;
408 server
= (*aesit
).Name
;
410 if (!shard
.empty() && !server
.empty() && !service
.empty() && !var
.empty())
412 string path
= CPath::standardizePath (IService::getInstance()->ConfigFile
.getVar("RRDVarPath").asString());
413 string rrdfilename
= path
+ shard
+"."+server
+"."+service
+"."+var
+".rrd";
417 if (!NLMISC::CFile::fileExists(rrdfilename
))
419 MYSQL_ROW row
= sqlQuery ("select graph_update from variable where path like '%%%s' and graph_update!=0", var
.c_str());
422 uint32 freq
= atoi(row
[0]);
423 arg
= "create "+rrdfilename
+" --step "+toString(freq
)+" DS:var:GAUGE:"+toString(freq
*2)+":U:U RRA:AVERAGE:0.5:1:1000 RRA:AVERAGE:0.5:10:1000 RRA:AVERAGE:0.5:100:1000";
424 launchProgram(IService::getInstance()->ConfigFile
.getVar("RRDToolPath").asString(), arg
);
428 nlwarning ("Can't create the rrd because no graph_update in database");
433 arg
= "update " + rrdfilename
+ " " + toString (CurrentTime
) + ":" + toString(val
);
434 launchProgram(IService::getInstance()->ConfigFile
.getVar("RRDToolPath").asString(), arg
);
438 nlwarning ("Shard server service var val is empty");
448 uint32
newRequest (TSockId from
)
450 static uint32 NextId
= 5461231;
452 Requests
.push_back (CRequest(NextId
, from
));
457 void addRequestWaitingNb (uint32 rid
)
459 for (uint i
= 0 ; i
< Requests
.size (); i
++)
461 if (Requests
[i
].Id
== rid
)
463 Requests
[i
].NbWaiting
++;
464 Requests
[i
].Time
= CTime::getSecondsSince1970 ();
468 nlwarning ("REQUEST: Received an answer from an unknown resquest %d (perhaps due to a AS timeout)", rid
);
471 void subRequestWaitingNb (uint32 rid
)
473 for (uint i
= 0 ; i
< Requests
.size (); i
++)
475 if (Requests
[i
].Id
== rid
)
477 Requests
[i
].NbWaiting
--;
481 nlwarning ("REQUEST: Received an answer from an unknown resquest %d (perhaps due to a AS timeout)", rid
);
484 void addRequestReceived (uint32 rid
)
486 for (uint i
= 0 ; i
< Requests
.size (); i
++)
488 if (Requests
[i
].Id
== rid
)
490 Requests
[i
].NbReceived
++;
494 nlwarning ("REQUEST: Received an answer from an unknown resquest %d (perhaps due to a AS timeout)", rid
);
497 void addRequestAnswer (uint32 rid
, const vector
<string
> &variables
, const vector
<string
> &values
)
499 for (uint i
= 0 ; i
< Requests
.size (); i
++)
501 Requests
[i
].addLine ();
502 if (Requests
[i
].Id
== rid
)
504 if (!variables
.empty() && variables
[0]=="__log")
506 nlassert (variables
.size() == 1);
508 for (uint j
= 0; j
< values
.size(); j
++)
510 Requests
[i
].Log
.push_back (values
[j
]);
515 nlassert (variables
.size() == values
.size ());
516 for (uint j
= 0; j
< variables
.size(); j
++)
518 uint32 pos
= Requests
[i
].getVariable (variables
[j
]);
519 Requests
[i
].Array
[pos
][Requests
[i
].NbLines
-1] = values
[j
];
525 nlwarning ("REQUEST: Received an answer from an unknown resquest %d (perhaps due to a AS timeout)", rid
);
528 bool emptyRequest (uint32 rid
)
530 for (uint i
= 0 ; i
< Requests
.size (); i
++)
532 if (Requests
[i
].Id
== rid
&& Requests
[i
].NbWaiting
!= 0)
542 uint32 currentTime
= CTime::getSecondsSince1970 ();
546 for (uint i
= 0 ; i
< Requests
.size ();)
548 // the AES doesn't answer quickly
549 timeout
= (currentTime
>= Requests
[i
].Time
+RequestTimeout
);
551 if (Requests
[i
].NbWaiting
<= Requests
[i
].NbReceived
|| timeout
)
553 // the request is over, send to the php
559 nlwarning ("REQUEST: Request %d timeouted, only %d on %d services have replied", Requests
[i
].Id
, Requests
[i
].NbReceived
, Requests
[i
].NbWaiting
);
562 if (Requests
[i
].Log
.empty())
564 if (Requests
[i
].NbRow
== 0 && timeout
)
566 str
= "1 ((TIMEOUT))";
570 str
= toString(Requests
[i
].NbRow
) + " ";
571 for (uint k
= 0; k
< Requests
[i
].NbLines
; k
++)
573 for (uint j
= 0; j
< Requests
[i
].NbRow
; j
++)
575 nlassert (Requests
[i
].Array
.size () == Requests
[i
].NbRow
);
576 if (Requests
[i
].Array
[j
][k
].empty ())
580 str
+= Requests
[i
].Array
[j
][k
];
582 str
+= "((TIMEOUT))";
591 for (uint k
= 0; k
< Requests
[i
].Log
.size(); k
++)
593 str
+= Requests
[i
].Log
[k
];
595 str
+= "((TIMEOUT))";
599 sendString (Requests
[i
].From
, str
);
601 // set to 0 to erase it
602 Requests
[i
].NbWaiting
= 0;
605 if (Requests
[i
].NbWaiting
== 0)
607 Requests
.erase (Requests
.begin ()+i
);
626 MYSQL
*db
= mysql_init(NULL
);
629 nlerror ("mysql_init() failed");
633 if (mysql_options (db
, MYSQL_OPT_RECONNECT
, &opt
))
636 DatabaseConnection
= 0;
637 nlerror("mysql_options() failed for database connection to '%s'", IService::getInstance()->ConfigFile
.getVar("DatabaseHost").asString().c_str());
641 DatabaseConnection
= mysql_real_connect(db
,
642 IService::getInstance()->ConfigFile
.getVar("DatabaseHost").asString().c_str(),
643 IService::getInstance()->ConfigFile
.getVar("DatabaseLogin").asString().c_str(),
644 IService::getInstance()->ConfigFile
.getVar("DatabasePassword").asString().c_str(),
645 IService::getInstance()->ConfigFile
.getVar("DatabaseName").asString().c_str(),
647 if (DatabaseConnection
== NULL
|| DatabaseConnection
!= db
)
649 nlerror ("mysql_real_connect() failed to '%s' with login '%s' and database name '%s' with %s",
650 IService::getInstance()->ConfigFile
.getVar("DatabaseHost").asString().c_str(),
651 IService::getInstance()->ConfigFile
.getVar("DatabaseLogin").asString().c_str(),
652 IService::getInstance()->ConfigFile
.getVar("DatabaseName").asString().c_str(),
653 (IService::getInstance()->ConfigFile
.getVar("DatabasePassword").asString().empty()?"empty password":"password")
657 #if MYSQL_VERSION_ID < 50019
659 if (mysql_options (DatabaseConnection
, MYSQL_OPT_RECONNECT
, &opt
))
662 DatabaseConnection
= 0;
663 nlerror("mysql_options() failed for database connection to '%s'", IService::getInstance()->ConfigFile
.getVar("DatabaseHost").asString().c_str());
670 ////////////////////////////////////////////////////////////////////////////////////////////////////////
671 ////////////////////////////////////////////////////////////////////////////////////////////////////////
672 ////////////////// CONNECTION TO THE AES ///////////////////////////////////////////////////////////////
673 ////////////////////////////////////////////////////////////////////////////////////////////////////////
674 ////////////////////////////////////////////////////////////////////////////////////////////////////////
676 void sendAESInformation (TServiceId sid
)
678 AESIT aesit
= findAES (sid
);
680 vector
<string
> information
;
682 CMessage
msgout("AES_INFO");
685 // send services that should be running on this AES
687 information
.clear ();
688 MYSQL_ROW row
= sqlQuery ("select name from service where server='%s'", (*aesit
).Name
.c_str());
691 string service
= row
[0];
692 nlinfo ("Adding '%s' in registered services to AES-%hu", row
[0], sid
.get());
693 information
.push_back (service
);
697 msgout
.serialCont (information
);
700 // send variable alarms for services that should running on this AES
702 information
.clear ();
703 row
= sqlQuery ("select path, error_bound, alarm_order from variable where error_bound!=-1");
706 nlinfo ("Adding '%s' '%s' '%s' in alarm to AES-%hu", row
[0], row
[1], row
[2], sid
.get());
707 information
.push_back (row
[0]);
708 information
.push_back (row
[1]);
709 information
.push_back (row
[2]);
713 msgout
.serialCont (information
);
716 // send graph update for services that should running on this AES
718 information
.clear ();
719 row
= sqlQuery ("select path, graph_update from variable where graph_update!=0");
722 CVarPath
varpath (row
[0]);
724 for(uint i
= 0; i
< varpath
.Destination
.size(); i
++)
726 string a
= varpath
.Destination
[i
].first
, b
= (*aesit
).Shard
;
727 if(varpath
.Destination
[i
].first
== "*" || varpath
.Destination
[i
].first
== (*aesit
).Shard
)
729 CVarPath
varpath2 (varpath
.Destination
[i
].second
);
731 for(uint j
= 0; j
< varpath2
.Destination
.size(); j
++)
733 string c
= varpath2
.Destination
[j
].first
, d
= (*aesit
).Name
;
734 if(varpath2
.Destination
[j
].first
== "*" || varpath2
.Destination
[j
].first
== (*aesit
).Name
)
736 nlinfo ("Adding '%s' '%s' in graph to AES-%hu", row
[0], row
[1], sid
.get());
737 information
.push_back (row
[0]);
738 information
.push_back (row
[1]);
746 msgout
.serialCont (information
);
748 nlinfo ("Sending all information about %s AES-%hu (hostedservices, alarms,grapupdate)", (*aesit
).Name
.c_str(), (*aesit
).SId
.get());
749 CUnifiedNetwork::getInstance ()->send (sid
, msgout
);
752 void rejectAES(TServiceId sid
, const string
&res
)
754 CMessage
msgout("REJECTED");
755 msgout
.serial ((string
&)res
);
756 CUnifiedNetwork::getInstance ()->send (sid
, msgout
);
759 // i'm connected to a new admin executor service
760 static void cbNewAESConnection (const std::string
&serviceName
, TServiceId sid
, void *arg
)
763 CCallbackNetBase
*cnb
= CUnifiedNetwork::getInstance ()->getNetBase (sid
, from
);
764 const CInetAddress
&ia
= cnb
->hostAddress (from
);
766 AESIT aesit
= findAES (sid
, false);
768 if (aesit
!= AdminExecutorServices
.end ())
770 nlwarning ("Connection of an AES that is already in the list (%s)", ia
.asString ().c_str ());
771 rejectAES (sid
, "This AES is already in the AS list");
775 MYSQL_ROW row
= sqlQuery ("select name from server where address='%s'", ia
.ipAddress().c_str());
778 if (!AllowExplicitAESRegistration
)
780 nlwarning ("Connection of an AES that is not in database server list (%s)", ia
.asString ().c_str ());
784 nlinfo ("Rejecting auto-connection of an AES (%s) - this should provke explicitly reconnect", ia
.asString ().c_str ());
786 rejectAES (sid
, "This AES is not registered in the database");
790 string server
= row
[0];
793 row
= sqlQuery ("select shard from service where server='%s'", server
.c_str());
796 nlwarning ("Connection of an AES that is not in database server list (%s)", ia
.asString ().c_str ());
797 rejectAES (sid
, "This AES is not registered in the database");
801 string shard
= row
[0];
804 AdminExecutorServices
.push_back (CAdminExecutorService(shard
, server
, sid
));
806 nlinfo ("%s-%hu, server name %s, for shard %s connected and added in the list", serviceName
.c_str(), sid
.get(), server
.c_str(), shard
.c_str());
808 // send him services that should run on this server
809 sendAESInformation (sid
);
812 // i'm disconnected from an admin executor service
813 static void cbNewAESDisconnection (const std::string
&serviceName
, TServiceId sid
, void *arg
)
816 CCallbackNetBase
*cnb
= CUnifiedNetwork::getInstance ()->getNetBase (sid
, from
);
817 const CInetAddress
&ia
= cnb
->hostAddress (from
);
819 AESIT aesit
= findAES (sid
, false);
821 if (aesit
== AdminExecutorServices
.end ())
823 nlwarning ("Disconnection of %s-%hu that is not in my list (%s)", serviceName
.c_str (), sid
.get(), ia
.asString ().c_str ());
827 nlinfo ("%s-%hu, shard name %s, disconnected and removed from the list", serviceName
.c_str(), sid
.get(), (*aesit
).Name
.c_str ());
829 // we need to remove pending request
831 for(uint i
= 0; i
< (*aesit
).WaitingRequestId
.size (); i
++)
833 subRequestWaitingNb ((*aesit
).WaitingRequestId
[i
]);
836 AdminExecutorServices
.erase (aesit
);
839 // we receive an explicit registration message from an AES
840 void cbRegisterAES(CMessage
&msgin
, const std::string
&serviceName
, TServiceId sid
)
842 if (!AllowExplicitAESRegistration
)
844 nlwarning("Ignoring attempted AES registration because AllowExplicitAESRegistration==false");
852 msgin
.serial(server
);
857 nlwarning("Ignoring attempted AES registration due to execption during message decoding");
861 AdminExecutorServices
.push_back (CAdminExecutorService(shard
, server
, sid
));
863 nlinfo ("%s-%hu, server name %s, for shard %s connected and added in the list", serviceName
.c_str(), sid
.get(), server
.c_str(), shard
.c_str());
865 // send him services that should run on this server
866 sendAESInformation (sid
);
869 static void cbView (CMessage
&msgin
, const std::string
&serviceName
, TServiceId sid
)
874 AESIT aesit
= findAES (sid
);
876 for (uint i
= 0; i
< (*aesit
).WaitingRequestId
.size();)
878 if ((*aesit
).WaitingRequestId
[i
] == rid
)
880 (*aesit
).WaitingRequestId
.erase ((*aesit
).WaitingRequestId
.begin ()+i
);
888 MYSQL_ROW row
= sqlQuery ("select distinct shard from service where server='%s'", (*aesit
).Name
.c_str ());
890 // shard name is find using the "service" table, so, if there s no shard name in it, it returns ???
892 if (row
!= NULL
) shardName
= row
[0];
893 else shardName
= DontUseDataBase
? aesit
->Shard
: "???";
895 vector
<string
> vara
, vala
;
897 while ((uint32
)msgin
.getPos() < msgin
.length())
902 // adding default row
903 vara
.push_back ("shard");
904 vara
.push_back ("server");
906 vala
.push_back (shardName
);
907 vala
.push_back ((*aesit
).Name
);
913 for (i
= 0; i
< nb
; i
++)
921 vara
.push_back (var
);
924 if (vara
.size() > 0 && vara
[0] == "__log")
925 vala
.push_back ("----- Result from Shard "+shardName
+" Server "+(*aesit
).Name
+"\n");
928 for (i
= 0; i
< nb
; i
++)
931 vala
.push_back (val
);
933 addRequestAnswer (rid
, vara
, vala
);
937 // inc the NbReceived counter
938 addRequestReceived (rid
);
941 TUnifiedCallbackItem CallbackArray
[] =
943 { "REGISTER_AES", cbRegisterAES
},
945 { "ADMIN_EMAIL", cbAdminEmail
},
946 { "GRAPH_UPDATE", cbGraphUpdate
},
950 ////////////////////////////////////////////////////////////////////////////////////////////////////////
951 ////////////////////////////////////////////////////////////////////////////////////////////////////////
952 ////////////////// CONNECTION TO THE CLIENT ////////////////////////////////////////////////////////////
953 ////////////////////////////////////////////////////////////////////////////////////////////////////////
954 ////////////////////////////////////////////////////////////////////////////////////////////////////////
956 void addRequest (const string
&rawvarpath
, TSockId from
)
958 nlinfo ("addRequest from %s: '%s'", from
->asString ().c_str (), rawvarpath
.c_str ());
960 if(rawvarpath
.empty ())
962 // send an empty string to say to php that there's nothing
964 sendString (from
, str
);
971 if(rawvarpath
== "reload")
973 // it means the we have to resend the list of services managed by AES from the mysql tables
974 for (AESIT aesit
= AdminExecutorServices
.begin(); aesit
!= AdminExecutorServices
.end(); aesit
++)
976 sendAESInformation ((*aesit
).SId
);
979 // send an empty string to say to php that there's nothing
981 sendString (from
, str
);
989 CVarPath
varpath (rawvarpath
);
991 uint32 rid
= newRequest (from
);
993 for (uint i
= 0; i
< varpath
.Destination
.size (); i
++)
995 string shard
= varpath
.Destination
[i
].first
;
997 CVarPath
subvarpath (varpath
.Destination
[i
].second
);
999 for (uint j
= 0; j
< subvarpath
.Destination
.size (); j
++)
1001 string server
= subvarpath
.Destination
[j
].first
;
1003 if (shard
== "*" && server
== "*")
1005 // Send the request to all online servers of all online shards
1008 for (aesit
= AdminExecutorServices
.begin(); aesit
!= AdminExecutorServices
.end(); aesit
++)
1010 addRequestWaitingNb (rid
);
1011 (*aesit
).WaitingRequestId
.push_back (rid
);
1013 CMessage
msgout("AES_GET_VIEW");
1014 msgout
.serial (rid
);
1015 msgout
.serial (subvarpath
.Destination
[j
].second
);
1016 CUnifiedNetwork::getInstance ()->send ((*aesit
).SId
, msgout
);
1017 nlinfo ("REQUEST: Sent view '%s' to shard name %s 'AES-%hu'", subvarpath
.Destination
[j
].second
.c_str(), (*aesit
).Name
.c_str(), (*aesit
).SId
.get());
1020 else if (shard
== "*" && server
== "#")
1022 // Select all shard all server including offline one
1024 MYSQL_ROW row
= sqlQuery ("select distinct server, shard from service");
1028 AESIT aesit
= findAES (row
[0], false);
1030 if (aesit
!= AdminExecutorServices
.end())
1032 addRequestWaitingNb (rid
);
1033 (*aesit
).WaitingRequestId
.push_back (rid
);
1035 CMessage
msgout("AES_GET_VIEW");
1036 msgout
.serial (rid
);
1037 msgout
.serial (subvarpath
.Destination
[j
].second
);
1038 CUnifiedNetwork::getInstance ()->send ((*aesit
).SId
, msgout
);
1039 nlinfo ("REQUEST: Sent view '%s' to shard name %s 'AES-%hu'", subvarpath
.Destination
[j
].second
.c_str(), (*aesit
).Name
.c_str(), (*aesit
).SId
.get());
1042 else if (server
== "#")
1044 vector
<string
> vara
, vala
;
1046 // adding default row
1047 vara
.push_back ("shard");
1048 vala
.push_back (row
[1]);
1050 vara
.push_back ("server");
1051 vala
.push_back (row
[0]);
1053 vara
.push_back ("service");
1054 vala
.push_back ("AES");
1056 vara
.push_back ("State");
1057 vala
.push_back ("Offline");
1059 addRequestAnswer (rid
, vara
, vala
);
1061 row
= sqlNextRow ();
1065 else if (server
== "*" || server
== "#")
1067 // Send the request to all online server of a specific shard
1069 MYSQL_ROW row
= sqlQuery ("select distinct server from service where shard='%s'", shard
.c_str ());
1073 AESIT aesit
= findAES (row
[0], false);
1075 if (aesit
!= AdminExecutorServices
.end())
1077 addRequestWaitingNb (rid
);
1078 (*aesit
).WaitingRequestId
.push_back (rid
);
1080 CMessage
msgout("AES_GET_VIEW");
1081 msgout
.serial (rid
);
1082 msgout
.serial (subvarpath
.Destination
[j
].second
);
1083 CUnifiedNetwork::getInstance ()->send ((*aesit
).SId
, msgout
);
1084 nlinfo ("REQUEST: Sent view '%s' to shard name %s 'AES-%hu'", subvarpath
.Destination
[j
].second
.c_str(), (*aesit
).Name
.c_str(), (*aesit
).SId
.get());
1087 else if (server
== "#")
1089 vector
<string
> vara
, vala
;
1091 // adding default row
1092 vara
.push_back ("shard");
1093 vala
.push_back (shard
);
1095 vara
.push_back ("server");
1096 vala
.push_back (row
[0]);
1098 vara
.push_back ("service");
1099 vala
.push_back ("AES");
1101 vara
.push_back ("State");
1102 vala
.push_back ("Offline");
1104 addRequestAnswer (rid
, vara
, vala
);
1106 row
= sqlNextRow ();
1113 AESIT aesit
= findAES (server
, false);
1115 if (aesit
!= AdminExecutorServices
.end())
1117 addRequestWaitingNb (rid
);
1118 (*aesit
).WaitingRequestId
.push_back (rid
);
1120 CMessage
msgout("AES_GET_VIEW");
1121 msgout
.serial (rid
);
1122 msgout
.serial (subvarpath
.Destination
[j
].second
);
1123 CUnifiedNetwork::getInstance ()->send ((*aesit
).SId
, msgout
);
1124 nlinfo ("REQUEST: Sent view '%s' to shard name %s 'AES-%hu'", subvarpath
.Destination
[j
].second
.c_str(), (*aesit
).Name
.c_str(), (*aesit
).SId
.get());
1128 nlwarning ("Server %s is not found in the list", server
.c_str ());
1135 static void varRequestTimeout(CConfigFile::CVar
&var
)
1137 RequestTimeout
= var
.asInt();
1138 nlinfo ("Request timeout is now after %d seconds", RequestTimeout
);
1141 static void varAdminAlertAccumlationTime (CConfigFile::CVar
&var
)
1143 AdminAlertAccumlationTime
= var
.asInt();
1147 ////////////////////////////////////////////////////////////////////////////////////////////////////////
1148 ////////////////////////////////////////////////////////////////////////////////////////////////////////
1149 ////////////////// SERVICE IMPLEMENTATION //////////////////////////////////////////////////////////////
1150 ////////////////////////////////////////////////////////////////////////////////////////////////////////
1151 ////////////////////////////////////////////////////////////////////////////////////////////////////////
1153 class CAdminService
: public IService
1157 /// Init the service, load the universal time.
1160 setDefaultEmailParams (ConfigFile
.getVar ("SMTPServer").asString (), ConfigFile
.getVar("DefaultEmailFrom").asString(), "");
1164 connectionWebInit ();
1166 //CVarPath toto ("[toto");
1168 //CVarPath toto ("*.*.*.*");
1169 //CVarPath toto ("[srv1,srv2].*.*.*");
1170 //CVarPath toto ("[svr1.svc1,srv2.svc2].*.*");
1171 //CVarPath toto ("[svr1.[svc1,svc2].*.var1,srv2.svc2.fe*.var2].toto");
1172 //CVarPath toto ("[svr1.svc1.*.toto,srv2.svc2.*.tata]");
1174 CUnifiedNetwork::getInstance ()->setServiceUpCallback ("AES", cbNewAESConnection
);
1175 CUnifiedNetwork::getInstance ()->setServiceDownCallback ("AES", cbNewAESDisconnection
);
1177 varRequestTimeout (ConfigFile
.getVar ("RequestTimeout"));
1178 ConfigFile
.setCallback("RequestTimeout", &varRequestTimeout
);
1180 varAdminAlertAccumlationTime (ConfigFile
.getVar ("AdminAlertAccumlationTime"));
1181 ConfigFile
.setCallback("AdmimAlertAccumlationTime", &varAdminAlertAccumlationTime
);
1188 connectionWebUpdate ();
1190 updateSendAdminAlert ();
1196 connectionWebRelease ();
1202 NLNET_SERVICE_MAIN (CAdminService
, "AS", "admin_service", 49996, CallbackArray
, NELNS_CONFIG
, NELNS_LOGS
);
1205 NLMISC_COMMAND (getViewAS
, "send a view and receive an array as result", "<varpath>")
1208 for (uint i
= 0; i
< args
.size(); i
++)
1210 if (i
!= 0) cmd
+= " ";
1214 addRequest (cmd
, NULL
);
1219 NLMISC_COMMAND (clearRequests
, "clear all pending requests", "")
1221 if(args
.size() != 0) return false;
1223 // for all request, set the NbWaiting to NbReceived, next cleanRequest() will send answer and clear all request
1224 for (uint i
= 0 ; i
< Requests
.size (); i
++)
1226 if (Requests
[i
].NbWaiting
<= Requests
[i
].NbReceived
)
1228 Requests
[i
].NbWaiting
= Requests
[i
].NbReceived
;
1235 NLMISC_COMMAND (displayRequests
, "display all pending requests", "")
1237 if(args
.size() != 0) return false;
1239 log
.displayNL ("Display %d pending requests", Requests
.size ());
1240 for (uint i
= 0 ; i
< Requests
.size (); i
++)
1242 log
.displayNL ("id: %d wait: %d recv: %d from: %s nbrow: %d", Requests
[i
].Id
, Requests
[i
].NbWaiting
, Requests
[i
].NbReceived
, Requests
[i
].From
->asString ().c_str (), Requests
[i
].NbRow
);
1244 log
.displayNL ("End of display pending requests");
1249 NLMISC_COMMAND (generateAlert
, "generate an alert", "<text>")
1251 if(args
.size() != 1) return false;
1253 sendAdminAlert (args
[0].c_str());