Fixed: Clear scene background if sky filter is used
[ryzomcore.git] / nelns / admin_service / admin_service.cpp
blob9c11347fc4346a254cf4439f8378103ca34d4ce1
1 // NeLNS - MMORPG Framework <http://dev.ryzom.com/projects/nel/>
2 // Copyright (C) 2010 Winch Gate Property Limited
3 //
4 // This source file has been modified by the following contributors:
5 // Copyright (C) 2019 Jan BOON (Kaetemi) <jan.boon@kaetemi.be>
6 //
7 // This program is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Affero General Public License as
9 // published by the Free Software Foundation, either version 3 of the
10 // License, or (at your option) any later version.
12 // This program is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU Affero General Public License for more details.
17 // You should have received a copy of the GNU Affero General Public License
18 // along with this program. If not, see <http://www.gnu.org/licenses/>.
20 #ifdef HAVE_CONFIG_H
21 #include "config.h"
22 #endif // HAVE_CONFIG_H
24 #ifndef NELNS_CONFIG
25 #define NELNS_CONFIG ""
26 #endif // NELNS_CONFIG
28 #ifndef NELNS_LOGS
29 #define NELNS_LOGS ""
30 #endif // NELNS_LOGS
32 #include "nel/misc/types_nl.h"
34 #include <string>
35 #include <list>
37 #ifdef NL_OS_WINDOWS
38 #include <winsock2.h>
39 #include <windows.h>
40 typedef unsigned long ulong;
41 #endif
43 #include <mysql.h>
44 #ifndef LIBMARIADB
45 #include <mysql_version.h>
46 #endif
48 #include "nel/misc/debug.h"
49 #include "nel/misc/config_file.h"
50 #include "nel/misc/path.h"
51 #include "nel/misc/command.h"
53 #include "nel/net/service.h"
54 #include "nel/net/varpath.h"
55 #include "nel/net/email.h"
57 #include "connection_web.h"
61 // Namespaces
64 using namespace std;
65 using namespace NLMISC;
66 using namespace NLNET;
70 // NeL Variables (for config file, etc)
73 // this variable should be used in conjunction with UseExplicitAESRegistration.
74 // the AS / AES registration process works as follows:
75 // - aes creates a layer 5 connection to as
76 // - as gets a serviceUp callback and looks in the database to try to find a match for the AES
77 // - if the match fails then AS sends a reject message to the AES
78 // - when the AES receives the reject message they check their UseExplicitAESRegistration flag - if it's set they
79 // attempt an explicit connection, sending the info required by the AS that would normally come from the database
80 // - when the AS receives an explicit registration, it verifies the state of the AllowExplicitAESRegistration flag
81 // and completes the registration work that failed earlier due to the database access failure
82 CVariable<bool> AllowExplicitAESRegistration("as","AllowExplicitAESRegistration","flag to allow AES services to register explicitly",false,0,true);
84 // this variable allows one to launch an AS on a machine that doesn't have a database setup
85 // the functionality of the AS is reduced (particularly in respect to alarms and graphs which are configured via the database)
86 CVariable<bool> DontUseDataBase("as","DontUseDataBase","if this flag is set calls to the database will be ignored",false,0,true);
90 // Structures
93 struct CRequest
95 CRequest (uint32 id, TSockId from) : Id(id), NbWaiting(0), NbReceived(0), From(from), NbRow(0), NbLines(1)
97 Time = CTime::getSecondsSince1970 ();
100 uint32 Id;
101 uint NbWaiting;
102 uint32 NbReceived;
103 TSockId From;
104 uint32 Time; // when the request was ask
106 uint32 NbRow;
107 uint32 NbLines;
109 vector<vector<string> > Array; // it's the 2 dimensional array that will be send to the php for variables
110 vector<string> Log; // this log contains the answer if a command was asked, othewise, Array contains the results
112 uint32 getVariable(const string &variable)
114 for (uint32 i = 0; i < NbRow; i++)
115 if (Array[i][0] == variable)
116 return i;
118 // need to add the variable
119 vector<string> NewRow;
120 NewRow.resize (NbLines);
121 NewRow[0] = variable;
122 Array.push_back (NewRow);
123 return NbRow++;
126 void addLine ()
128 for (uint32 i = 0; i < NbRow; i++)
129 Array[i].push_back("");
131 NbLines++;
134 void display ()
136 if (Log.empty())
138 nlinfo ("Display answer array for request %d: %d row %d lines", Id, NbRow, NbLines);
139 for (uint i = 0; i < NbLines; i++)
141 for (uint j = 0; j < NbRow; j++)
143 nlassert (Array.size () == NbRow);
144 InfoLog->displayRaw ("%-20s", Array[j][i].c_str());
146 InfoLog->displayRawNL ("");
148 InfoLog->displayRawNL ("End of the array");
150 else
152 nlinfo ("Display the log for request %d: %d lines", Id, Log.size());
153 for (uint i = 0; i < Log.size(); i++)
155 InfoLog->displayRaw ("%s", Log[i].c_str());
157 InfoLog->displayRawNL ("End of the log");
162 struct CAdminExecutorService
164 CAdminExecutorService (const string &shard, const string &name, TServiceId sid) : Shard(shard), SId(sid), Name(name) { }
166 string Shard; /// Name of the shard
167 TServiceId SId; /// uniq number to identify the AES
168 string Name; /// name of the admin executor service
170 vector<uint32> WaitingRequestId; /// contains all request that the server hasn't reply yet
174 typedef list<CAdminExecutorService> TAdminExecutorServices;
175 typedef list<CAdminExecutorService>::iterator AESIT;
179 // Variables
182 TAdminExecutorServices AdminExecutorServices;
184 MYSQL *DatabaseConnection = NULL;
186 vector<CRequest> Requests;
188 uint32 RequestTimeout = 5; // in second
190 // cumulate 5 seconds of alert
191 sint32 AdminAlertAccumlationTime = 5;
195 // Functions
198 AESIT findAES (TServiceId sid, bool asrt = true)
200 AESIT aesit;
201 for (aesit = AdminExecutorServices.begin(); aesit != AdminExecutorServices.end(); aesit++)
202 if ((*aesit).SId == sid)
203 break;
205 if (asrt)
206 nlassert (aesit != AdminExecutorServices.end());
207 return aesit;
210 AESIT findAES (const string &name, bool asrt = true)
212 AESIT aesit;
213 for (aesit = AdminExecutorServices.begin(); aesit != AdminExecutorServices.end(); aesit++)
214 if ((*aesit).Name == name)
215 break;
217 if (asrt)
218 nlassert (aesit != AdminExecutorServices.end());
220 return aesit;
225 // SQL helpers
228 MYSQL_RES *sqlCurrentQueryResult = NULL;
230 MYSQL_ROW sqlQuery (const char *format, ...)
232 if (DontUseDataBase)
233 return 0;
235 char *query;
236 NLMISC_CONVERT_VARGS (query, format, 1024);
238 if (DatabaseConnection == 0)
240 nlwarning ("MYSQL: mysql_query (%s) failed: DatabaseConnection is 0", query);
241 return NULL;
244 int ret = mysql_query (DatabaseConnection, query);
245 if (ret != 0)
247 nlwarning ("MYSQL: mysql_query () failed for query '%s': %s", query, mysql_error(DatabaseConnection));
248 return 0;
251 sqlCurrentQueryResult = mysql_store_result(DatabaseConnection);
252 if (sqlCurrentQueryResult == 0)
254 nlwarning ("MYSQL: mysql_store_result () failed for query '%s': %s", query, mysql_error(DatabaseConnection));
255 return 0;
258 MYSQL_ROW row = mysql_fetch_row(sqlCurrentQueryResult);
259 if (row == 0)
261 nlwarning ("MYSQL: mysql_fetch_row () failed for query '%s': %s", query, mysql_error(DatabaseConnection));
264 nldebug ("MYSQL: sqlQuery(%s) returns %d rows", query, mysql_num_rows(sqlCurrentQueryResult));
266 return row;
269 MYSQL_ROW sqlNextRow ()
271 if (DontUseDataBase)
272 return 0;
274 if (sqlCurrentQueryResult == 0)
275 return 0;
277 return mysql_fetch_row(sqlCurrentQueryResult);
280 void sqlFlushResult()
282 if (DontUseDataBase)
283 return;
285 if (sqlCurrentQueryResult == NULL)
286 return;
288 mysql_free_result(sqlCurrentQueryResult);
289 sqlCurrentQueryResult = NULL;
294 // Admin functions
297 string Email;
298 uint32 FirstEmailTime = 0;
300 void sendAdminAlert (const char *format, ...)
302 char *text;
303 NLMISC_CONVERT_VARGS (text, format, 4096);
305 if (AdminAlertAccumlationTime == -1)
307 // we don't send email so just display a warning
308 nlwarning ("ALERT: %s", text);
310 else
312 if(Email.empty() && FirstEmailTime == 0)
314 Email += text;
315 FirstEmailTime = CTime::getSecondsSince1970();
317 else
319 Email += "\n";
320 Email += text;
322 nldebug ("ALERT: pushing email into queue: %s", text);
326 void updateSendAdminAlert ()
328 if(!Email.empty() && FirstEmailTime != 0 && AdminAlertAccumlationTime >=0 && CTime::getSecondsSince1970() > FirstEmailTime + AdminAlertAccumlationTime)
330 vector<string> lines;
331 explode (Email, string("\n"), lines, true);
333 if (!lines.empty())
336 if (IService::getInstance()->ConfigFile.exists("SysLogPath") && IService::getInstance()->ConfigFile.exists("SysLogParams"))
338 // syslog
339 string param;
340 if (lines.size() > 1)
342 param = "Multiple problems, first is: ";
344 param += lines[0];
345 string res = toString(IService::getInstance()->ConfigFile.getVar("SysLogParams").asString().c_str(), param.c_str());
346 launchProgram(IService::getInstance()->ConfigFile.getVar("SysLogPath").asString(), res);
349 if (IService::getInstance()->ConfigFile.exists("AdminEmail"))
351 // email
352 string subject;
353 if (lines.size() == 1)
355 subject = lines[0];
357 else
359 subject = "Multiple problems";
362 std::string from;
363 if(IService::getInstance()->ConfigFile.exists("AdminEmailFrom"))
364 from = IService::getInstance()->ConfigFile.getVar("AdminEmailFrom").asString();
365 CConfigFile::CVar &var = IService::getInstance()->ConfigFile.getVar("AdminEmail");
366 for (uint i = 0; i < var.size(); i++)
368 if (!sendEmail ("", from, var.asString(i), subject, Email))
370 nlwarning ("Can't send email to '%s'", var.asString(i).c_str());
372 else
374 nlinfo ("ALERT: Sent email to admin %s the subject: %s", var.asString(i).c_str(), subject.c_str());
380 Email = "";
381 FirstEmailTime = 0;
386 static void cbAdminEmail (CMessage &msgin, const std::string &serviceName, TServiceId sid)
388 string str;
389 msgin.serial(str);
390 sendAdminAlert (str.c_str());
393 static void cbGraphUpdate (CMessage &msgin, const std::string &serviceName, TServiceId sid)
395 uint32 CurrentTime;
396 msgin.serial (CurrentTime);
398 while (msgin.getPos() < (sint32)msgin.length())
400 string var, service;
401 sint32 val;
402 msgin.serial (service, var, val);
404 AESIT aesit = findAES (sid);
406 string shard, server;
407 shard = (*aesit).Shard;
408 server = (*aesit).Name;
410 if (!shard.empty() && !server.empty() && !service.empty() && !var.empty())
412 string path = CPath::standardizePath (IService::getInstance()->ConfigFile.getVar("RRDVarPath").asString());
413 string rrdfilename = path + shard+"."+server+"."+service+"."+var+".rrd";
415 string arg;
417 if (!NLMISC::CFile::fileExists(rrdfilename))
419 MYSQL_ROW row = sqlQuery ("select graph_update from variable where path like '%%%s' and graph_update!=0", var.c_str());
420 if (row != NULL)
422 uint32 freq = atoi(row[0]);
423 arg = "create "+rrdfilename+" --step "+toString(freq)+" DS:var:GAUGE:"+toString(freq*2)+":U:U RRA:AVERAGE:0.5:1:1000 RRA:AVERAGE:0.5:10:1000 RRA:AVERAGE:0.5:100:1000";
424 launchProgram(IService::getInstance()->ConfigFile.getVar("RRDToolPath").asString(), arg);
426 else
428 nlwarning ("Can't create the rrd because no graph_update in database");
430 sqlFlushResult();
433 arg = "update " + rrdfilename + " " + toString (CurrentTime) + ":" + toString(val);
434 launchProgram(IService::getInstance()->ConfigFile.getVar("RRDToolPath").asString(), arg);
436 else
438 nlwarning ("Shard server service var val is empty");
445 // Request functions
448 uint32 newRequest (TSockId from)
450 static uint32 NextId = 5461231;
452 Requests.push_back (CRequest(NextId, from));
454 return NextId++;
457 void addRequestWaitingNb (uint32 rid)
459 for (uint i = 0 ; i < Requests.size (); i++)
461 if (Requests[i].Id == rid)
463 Requests[i].NbWaiting++;
464 Requests[i].Time = CTime::getSecondsSince1970 ();
465 return;
468 nlwarning ("REQUEST: Received an answer from an unknown resquest %d (perhaps due to a AS timeout)", rid);
471 void subRequestWaitingNb (uint32 rid)
473 for (uint i = 0 ; i < Requests.size (); i++)
475 if (Requests[i].Id == rid)
477 Requests[i].NbWaiting--;
478 return;
481 nlwarning ("REQUEST: Received an answer from an unknown resquest %d (perhaps due to a AS timeout)", rid);
484 void addRequestReceived (uint32 rid)
486 for (uint i = 0 ; i < Requests.size (); i++)
488 if (Requests[i].Id == rid)
490 Requests[i].NbReceived++;
491 return;
494 nlwarning ("REQUEST: Received an answer from an unknown resquest %d (perhaps due to a AS timeout)", rid);
497 void addRequestAnswer (uint32 rid, const vector<string> &variables, const vector<string> &values)
499 for (uint i = 0 ; i < Requests.size (); i++)
501 Requests[i].addLine ();
502 if (Requests[i].Id == rid)
504 if (!variables.empty() && variables[0]=="__log")
506 nlassert (variables.size() == 1);
508 for (uint j = 0; j < values.size(); j++)
510 Requests[i].Log.push_back (values[j]);
513 else
515 nlassert (variables.size() == values.size ());
516 for (uint j = 0; j < variables.size(); j++)
518 uint32 pos = Requests[i].getVariable (variables[j]);
519 Requests[i].Array[pos][Requests[i].NbLines-1] = values[j];
522 return;
525 nlwarning ("REQUEST: Received an answer from an unknown resquest %d (perhaps due to a AS timeout)", rid);
528 bool emptyRequest (uint32 rid)
530 for (uint i = 0 ; i < Requests.size (); i++)
532 if (Requests[i].Id == rid && Requests[i].NbWaiting != 0)
534 return false;
537 return true;
540 void cleanRequest ()
542 uint32 currentTime = CTime::getSecondsSince1970 ();
544 bool timeout;
546 for (uint i = 0 ; i < Requests.size ();)
548 // the AES doesn't answer quickly
549 timeout = (currentTime >= Requests[i].Time+RequestTimeout);
551 if (Requests[i].NbWaiting <= Requests[i].NbReceived || timeout)
553 // the request is over, send to the php
555 string str;
557 if (timeout)
559 nlwarning ("REQUEST: Request %d timeouted, only %d on %d services have replied", Requests[i].Id, Requests[i].NbReceived, Requests[i].NbWaiting);
562 if (Requests[i].Log.empty())
564 if (Requests[i].NbRow == 0 && timeout)
566 str = "1 ((TIMEOUT))";
568 else
570 str = toString(Requests[i].NbRow) + " ";
571 for (uint k = 0; k < Requests[i].NbLines; k++)
573 for (uint j = 0; j < Requests[i].NbRow; j++)
575 nlassert (Requests[i].Array.size () == Requests[i].NbRow);
576 if (Requests[i].Array[j][k].empty ())
577 str += "??? ";
578 else
580 str += Requests[i].Array[j][k];
581 if (timeout)
582 str += "((TIMEOUT))";
583 str += " ";
589 else
591 for (uint k = 0; k < Requests[i].Log.size(); k++)
593 str += Requests[i].Log[k];
594 if (timeout)
595 str += "((TIMEOUT))";
599 sendString (Requests[i].From, str);
601 // set to 0 to erase it
602 Requests[i].NbWaiting = 0;
605 if (Requests[i].NbWaiting == 0)
607 Requests.erase (Requests.begin ()+i);
609 else
611 i++;
618 // SQL functions
621 void sqlInit ()
623 if (DontUseDataBase)
624 return;
626 MYSQL *db = mysql_init(NULL);
627 if(db == NULL)
629 nlerror ("mysql_init() failed");
632 my_bool opt = true;
633 if (mysql_options (db, MYSQL_OPT_RECONNECT, &opt))
635 mysql_close(db);
636 DatabaseConnection = 0;
637 nlerror("mysql_options() failed for database connection to '%s'", IService::getInstance()->ConfigFile.getVar("DatabaseHost").asString().c_str());
638 return;
641 DatabaseConnection = mysql_real_connect(db,
642 IService::getInstance()->ConfigFile.getVar("DatabaseHost").asString().c_str(),
643 IService::getInstance()->ConfigFile.getVar("DatabaseLogin").asString().c_str(),
644 IService::getInstance()->ConfigFile.getVar("DatabasePassword").asString().c_str(),
645 IService::getInstance()->ConfigFile.getVar("DatabaseName").asString().c_str(),
646 0,NULL,0);
647 if (DatabaseConnection == NULL || DatabaseConnection != db)
649 nlerror ("mysql_real_connect() failed to '%s' with login '%s' and database name '%s' with %s",
650 IService::getInstance()->ConfigFile.getVar("DatabaseHost").asString().c_str(),
651 IService::getInstance()->ConfigFile.getVar("DatabaseLogin").asString().c_str(),
652 IService::getInstance()->ConfigFile.getVar("DatabaseName").asString().c_str(),
653 (IService::getInstance()->ConfigFile.getVar("DatabasePassword").asString().empty()?"empty password":"password")
657 #if MYSQL_VERSION_ID < 50019
658 opt = true;
659 if (mysql_options (DatabaseConnection, MYSQL_OPT_RECONNECT, &opt))
661 mysql_close(db);
662 DatabaseConnection = 0;
663 nlerror("mysql_options() failed for database connection to '%s'", IService::getInstance()->ConfigFile.getVar("DatabaseHost").asString().c_str());
664 return;
666 #endif
670 ////////////////////////////////////////////////////////////////////////////////////////////////////////
671 ////////////////////////////////////////////////////////////////////////////////////////////////////////
672 ////////////////// CONNECTION TO THE AES ///////////////////////////////////////////////////////////////
673 ////////////////////////////////////////////////////////////////////////////////////////////////////////
674 ////////////////////////////////////////////////////////////////////////////////////////////////////////
676 void sendAESInformation (TServiceId sid)
678 AESIT aesit = findAES (sid);
680 vector<string> information;
682 CMessage msgout("AES_INFO");
685 // send services that should be running on this AES
687 information.clear ();
688 MYSQL_ROW row = sqlQuery ("select name from service where server='%s'", (*aesit).Name.c_str());
689 while (row != NULL)
691 string service = row[0];
692 nlinfo ("Adding '%s' in registered services to AES-%hu", row[0], sid.get());
693 information.push_back (service);
694 row = sqlNextRow ();
696 sqlFlushResult();
697 msgout.serialCont (information);
700 // send variable alarms for services that should running on this AES
702 information.clear ();
703 row = sqlQuery ("select path, error_bound, alarm_order from variable where error_bound!=-1");
704 while (row != NULL)
706 nlinfo ("Adding '%s' '%s' '%s' in alarm to AES-%hu", row[0], row[1], row[2], sid.get());
707 information.push_back (row[0]);
708 information.push_back (row[1]);
709 information.push_back (row[2]);
710 row = sqlNextRow ();
712 sqlFlushResult();
713 msgout.serialCont (information);
716 // send graph update for services that should running on this AES
718 information.clear ();
719 row = sqlQuery ("select path, graph_update from variable where graph_update!=0");
720 while (row != NULL)
722 CVarPath varpath (row[0]);
724 for(uint i = 0; i < varpath.Destination.size(); i++)
726 string a = varpath.Destination[i].first, b = (*aesit).Shard;
727 if(varpath.Destination[i].first == "*" || varpath.Destination[i].first == (*aesit).Shard)
729 CVarPath varpath2 (varpath.Destination[i].second);
731 for(uint j = 0; j < varpath2.Destination.size(); j++)
733 string c = varpath2.Destination[j].first, d = (*aesit).Name;
734 if(varpath2.Destination[j].first == "*" || varpath2.Destination[j].first == (*aesit).Name)
736 nlinfo ("Adding '%s' '%s' in graph to AES-%hu", row[0], row[1], sid.get());
737 information.push_back (row[0]);
738 information.push_back (row[1]);
743 row = sqlNextRow ();
745 sqlFlushResult();
746 msgout.serialCont (information);
748 nlinfo ("Sending all information about %s AES-%hu (hostedservices, alarms,grapupdate)", (*aesit).Name.c_str(), (*aesit).SId.get());
749 CUnifiedNetwork::getInstance ()->send (sid, msgout);
752 void rejectAES(TServiceId sid, const string &res)
754 CMessage msgout("REJECTED");
755 msgout.serial ((string &)res);
756 CUnifiedNetwork::getInstance ()->send (sid, msgout);
759 // i'm connected to a new admin executor service
760 static void cbNewAESConnection (const std::string &serviceName, TServiceId sid, void *arg)
762 TSockId from;
763 CCallbackNetBase *cnb = CUnifiedNetwork::getInstance ()->getNetBase (sid, from);
764 const CInetAddress &ia = cnb->hostAddress (from);
766 AESIT aesit = findAES (sid, false);
768 if (aesit != AdminExecutorServices.end ())
770 nlwarning ("Connection of an AES that is already in the list (%s)", ia.asString ().c_str ());
771 rejectAES (sid, "This AES is already in the AS list");
772 return;
775 MYSQL_ROW row = sqlQuery ("select name from server where address='%s'", ia.ipAddress().c_str());
776 if (row == NULL)
778 if (!AllowExplicitAESRegistration)
780 nlwarning ("Connection of an AES that is not in database server list (%s)", ia.asString ().c_str ());
782 else
784 nlinfo ("Rejecting auto-connection of an AES (%s) - this should provke explicitly reconnect", ia.asString ().c_str ());
786 rejectAES (sid, "This AES is not registered in the database");
787 sqlFlushResult();
788 return;
790 string server = row[0];
791 sqlFlushResult();
793 row = sqlQuery ("select shard from service where server='%s'", server.c_str());
794 if (row == NULL)
796 nlwarning ("Connection of an AES that is not in database server list (%s)", ia.asString ().c_str ());
797 rejectAES (sid, "This AES is not registered in the database");
798 sqlFlushResult();
799 return;
801 string shard = row[0];
802 sqlFlushResult();
804 AdminExecutorServices.push_back (CAdminExecutorService(shard, server, sid));
806 nlinfo ("%s-%hu, server name %s, for shard %s connected and added in the list", serviceName.c_str(), sid.get(), server.c_str(), shard.c_str());
808 // send him services that should run on this server
809 sendAESInformation (sid);
812 // i'm disconnected from an admin executor service
813 static void cbNewAESDisconnection (const std::string &serviceName, TServiceId sid, void *arg)
815 TSockId from;
816 CCallbackNetBase *cnb = CUnifiedNetwork::getInstance ()->getNetBase (sid, from);
817 const CInetAddress &ia = cnb->hostAddress (from);
819 AESIT aesit = findAES (sid, false);
821 if (aesit == AdminExecutorServices.end ())
823 nlwarning ("Disconnection of %s-%hu that is not in my list (%s)", serviceName.c_str (), sid.get(), ia.asString ().c_str ());
824 return;
827 nlinfo ("%s-%hu, shard name %s, disconnected and removed from the list", serviceName.c_str(), sid.get(), (*aesit).Name.c_str ());
829 // we need to remove pending request
831 for(uint i = 0; i < (*aesit).WaitingRequestId.size (); i++)
833 subRequestWaitingNb ((*aesit).WaitingRequestId[i]);
836 AdminExecutorServices.erase (aesit);
839 // we receive an explicit registration message from an AES
840 void cbRegisterAES(CMessage &msgin, const std::string &serviceName, TServiceId sid)
842 if (!AllowExplicitAESRegistration)
844 nlwarning("Ignoring attempted AES registration because AllowExplicitAESRegistration==false");
845 return;
848 string server;
849 string shard;
852 msgin.serial(server);
853 msgin.serial(shard);
855 catch(...)
857 nlwarning("Ignoring attempted AES registration due to execption during message decoding");
858 return;
861 AdminExecutorServices.push_back (CAdminExecutorService(shard, server, sid));
863 nlinfo ("%s-%hu, server name %s, for shard %s connected and added in the list", serviceName.c_str(), sid.get(), server.c_str(), shard.c_str());
865 // send him services that should run on this server
866 sendAESInformation (sid);
869 static void cbView (CMessage &msgin, const std::string &serviceName, TServiceId sid)
871 uint32 rid;
872 msgin.serial (rid);
874 AESIT aesit = findAES (sid);
876 for (uint i = 0; i < (*aesit).WaitingRequestId.size();)
878 if ((*aesit).WaitingRequestId[i] == rid)
880 (*aesit).WaitingRequestId.erase ((*aesit).WaitingRequestId.begin ()+i);
882 else
884 i++;
888 MYSQL_ROW row = sqlQuery ("select distinct shard from service where server='%s'", (*aesit).Name.c_str ());
890 // shard name is find using the "service" table, so, if there s no shard name in it, it returns ???
891 string shardName;
892 if (row != NULL) shardName = row[0];
893 else shardName = DontUseDataBase? aesit->Shard: "???";
895 vector<string> vara, vala;
897 while ((uint32)msgin.getPos() < msgin.length())
899 vara.clear ();
900 vala.clear ();
902 // adding default row
903 vara.push_back ("shard");
904 vara.push_back ("server");
906 vala.push_back (shardName);
907 vala.push_back ((*aesit).Name);
909 uint32 i, nb;
910 string var, val;
912 msgin.serial (nb);
913 for (i = 0; i < nb; i++)
915 msgin.serial (var);
916 if (var == "__log")
918 vara.clear ();
919 vala.clear ();
921 vara.push_back (var);
924 if (vara.size() > 0 && vara[0] == "__log")
925 vala.push_back ("----- Result from Shard "+shardName+" Server "+(*aesit).Name+"\n");
927 msgin.serial (nb);
928 for (i = 0; i < nb; i++)
930 msgin.serial (val);
931 vala.push_back (val);
933 addRequestAnswer (rid, vara, vala);
935 sqlFlushResult();
937 // inc the NbReceived counter
938 addRequestReceived (rid);
941 TUnifiedCallbackItem CallbackArray[] =
943 { "REGISTER_AES", cbRegisterAES },
944 { "VIEW", cbView },
945 { "ADMIN_EMAIL", cbAdminEmail },
946 { "GRAPH_UPDATE", cbGraphUpdate },
950 ////////////////////////////////////////////////////////////////////////////////////////////////////////
951 ////////////////////////////////////////////////////////////////////////////////////////////////////////
952 ////////////////// CONNECTION TO THE CLIENT ////////////////////////////////////////////////////////////
953 ////////////////////////////////////////////////////////////////////////////////////////////////////////
954 ////////////////////////////////////////////////////////////////////////////////////////////////////////
956 void addRequest (const string &rawvarpath, TSockId from)
958 nlinfo ("addRequest from %s: '%s'", from->asString ().c_str (), rawvarpath.c_str ());
960 if(rawvarpath.empty ())
962 // send an empty string to say to php that there's nothing
963 string str;
964 sendString (from, str);
968 // special cases
971 if(rawvarpath == "reload")
973 // it means the we have to resend the list of services managed by AES from the mysql tables
974 for (AESIT aesit = AdminExecutorServices.begin(); aesit != AdminExecutorServices.end(); aesit++)
976 sendAESInformation ((*aesit).SId);
979 // send an empty string to say to php that there's nothing
980 string str;
981 sendString (from, str);
982 return;
986 // normal cases
989 CVarPath varpath (rawvarpath);
991 uint32 rid = newRequest (from);
993 for (uint i = 0; i < varpath.Destination.size (); i++)
995 string shard = varpath.Destination[i].first;
997 CVarPath subvarpath (varpath.Destination[i].second);
999 for (uint j = 0; j < subvarpath.Destination.size (); j++)
1001 string server = subvarpath.Destination[j].first;
1003 if (shard == "*" && server == "*")
1005 // Send the request to all online servers of all online shards
1007 AESIT aesit;
1008 for (aesit = AdminExecutorServices.begin(); aesit != AdminExecutorServices.end(); aesit++)
1010 addRequestWaitingNb (rid);
1011 (*aesit).WaitingRequestId.push_back (rid);
1013 CMessage msgout("AES_GET_VIEW");
1014 msgout.serial (rid);
1015 msgout.serial (subvarpath.Destination[j].second);
1016 CUnifiedNetwork::getInstance ()->send ((*aesit).SId, msgout);
1017 nlinfo ("REQUEST: Sent view '%s' to shard name %s 'AES-%hu'", subvarpath.Destination[j].second.c_str(), (*aesit).Name.c_str(), (*aesit).SId.get());
1020 else if (shard == "*" && server == "#")
1022 // Select all shard all server including offline one
1024 MYSQL_ROW row = sqlQuery ("select distinct server, shard from service");
1026 while (row != NULL)
1028 AESIT aesit = findAES (row[0], false);
1030 if (aesit != AdminExecutorServices.end())
1032 addRequestWaitingNb (rid);
1033 (*aesit).WaitingRequestId.push_back (rid);
1035 CMessage msgout("AES_GET_VIEW");
1036 msgout.serial (rid);
1037 msgout.serial (subvarpath.Destination[j].second);
1038 CUnifiedNetwork::getInstance ()->send ((*aesit).SId, msgout);
1039 nlinfo ("REQUEST: Sent view '%s' to shard name %s 'AES-%hu'", subvarpath.Destination[j].second.c_str(), (*aesit).Name.c_str(), (*aesit).SId.get());
1042 else if (server == "#")
1044 vector<string> vara, vala;
1046 // adding default row
1047 vara.push_back ("shard");
1048 vala.push_back (row[1]);
1050 vara.push_back ("server");
1051 vala.push_back (row[0]);
1053 vara.push_back ("service");
1054 vala.push_back ("AES");
1056 vara.push_back ("State");
1057 vala.push_back ("Offline");
1059 addRequestAnswer (rid, vara, vala);
1061 row = sqlNextRow ();
1063 sqlFlushResult();
1065 else if (server == "*" || server == "#")
1067 // Send the request to all online server of a specific shard
1069 MYSQL_ROW row = sqlQuery ("select distinct server from service where shard='%s'", shard.c_str ());
1071 while (row != NULL)
1073 AESIT aesit = findAES (row[0], false);
1075 if (aesit != AdminExecutorServices.end())
1077 addRequestWaitingNb (rid);
1078 (*aesit).WaitingRequestId.push_back (rid);
1080 CMessage msgout("AES_GET_VIEW");
1081 msgout.serial (rid);
1082 msgout.serial (subvarpath.Destination[j].second);
1083 CUnifiedNetwork::getInstance ()->send ((*aesit).SId, msgout);
1084 nlinfo ("REQUEST: Sent view '%s' to shard name %s 'AES-%hu'", subvarpath.Destination[j].second.c_str(), (*aesit).Name.c_str(), (*aesit).SId.get());
1087 else if (server == "#")
1089 vector<string> vara, vala;
1091 // adding default row
1092 vara.push_back ("shard");
1093 vala.push_back (shard);
1095 vara.push_back ("server");
1096 vala.push_back (row[0]);
1098 vara.push_back ("service");
1099 vala.push_back ("AES");
1101 vara.push_back ("State");
1102 vala.push_back ("Offline");
1104 addRequestAnswer (rid, vara, vala);
1106 row = sqlNextRow ();
1109 sqlFlushResult();
1111 else
1113 AESIT aesit = findAES (server, false);
1115 if (aesit != AdminExecutorServices.end())
1117 addRequestWaitingNb (rid);
1118 (*aesit).WaitingRequestId.push_back (rid);
1120 CMessage msgout("AES_GET_VIEW");
1121 msgout.serial (rid);
1122 msgout.serial (subvarpath.Destination[j].second);
1123 CUnifiedNetwork::getInstance ()->send ((*aesit).SId, msgout);
1124 nlinfo ("REQUEST: Sent view '%s' to shard name %s 'AES-%hu'", subvarpath.Destination[j].second.c_str(), (*aesit).Name.c_str(), (*aesit).SId.get());
1126 else
1128 nlwarning ("Server %s is not found in the list", server.c_str ());
1135 static void varRequestTimeout(CConfigFile::CVar &var)
1137 RequestTimeout = var.asInt();
1138 nlinfo ("Request timeout is now after %d seconds", RequestTimeout);
1141 static void varAdminAlertAccumlationTime (CConfigFile::CVar &var)
1143 AdminAlertAccumlationTime = var.asInt();
1147 ////////////////////////////////////////////////////////////////////////////////////////////////////////
1148 ////////////////////////////////////////////////////////////////////////////////////////////////////////
1149 ////////////////// SERVICE IMPLEMENTATION //////////////////////////////////////////////////////////////
1150 ////////////////////////////////////////////////////////////////////////////////////////////////////////
1151 ////////////////////////////////////////////////////////////////////////////////////////////////////////
1153 class CAdminService : public IService
1155 public:
1157 /// Init the service, load the universal time.
1158 void init ()
1160 setDefaultEmailParams (ConfigFile.getVar ("SMTPServer").asString (), ConfigFile.getVar("DefaultEmailFrom").asString(), "");
1162 sqlInit ();
1164 connectionWebInit ();
1166 //CVarPath toto ("[toto");
1168 //CVarPath toto ("*.*.*.*");
1169 //CVarPath toto ("[srv1,srv2].*.*.*");
1170 //CVarPath toto ("[svr1.svc1,srv2.svc2].*.*");
1171 //CVarPath toto ("[svr1.[svc1,svc2].*.var1,srv2.svc2.fe*.var2].toto");
1172 //CVarPath toto ("[svr1.svc1.*.toto,srv2.svc2.*.tata]");
1174 CUnifiedNetwork::getInstance ()->setServiceUpCallback ("AES", cbNewAESConnection);
1175 CUnifiedNetwork::getInstance ()->setServiceDownCallback ("AES", cbNewAESDisconnection);
1177 varRequestTimeout (ConfigFile.getVar ("RequestTimeout"));
1178 ConfigFile.setCallback("RequestTimeout", &varRequestTimeout);
1180 varAdminAlertAccumlationTime (ConfigFile.getVar ("AdminAlertAccumlationTime"));
1181 ConfigFile.setCallback("AdmimAlertAccumlationTime", &varAdminAlertAccumlationTime);
1185 bool update ()
1187 cleanRequest ();
1188 connectionWebUpdate ();
1190 updateSendAdminAlert ();
1191 return true;
1194 void release ()
1196 connectionWebRelease ();
1201 /// Admin Service
1202 NLNET_SERVICE_MAIN (CAdminService, "AS", "admin_service", 49996, CallbackArray, NELNS_CONFIG, NELNS_LOGS);
1205 NLMISC_COMMAND (getViewAS, "send a view and receive an array as result", "<varpath>")
1207 string cmd;
1208 for (uint i = 0; i < args.size(); i++)
1210 if (i != 0) cmd += " ";
1211 cmd += args[i];
1214 addRequest (cmd, NULL);
1216 return true;
1219 NLMISC_COMMAND (clearRequests, "clear all pending requests", "")
1221 if(args.size() != 0) return false;
1223 // for all request, set the NbWaiting to NbReceived, next cleanRequest() will send answer and clear all request
1224 for (uint i = 0 ; i < Requests.size (); i++)
1226 if (Requests[i].NbWaiting <= Requests[i].NbReceived)
1228 Requests[i].NbWaiting = Requests[i].NbReceived;
1232 return true;
1235 NLMISC_COMMAND (displayRequests, "display all pending requests", "")
1237 if(args.size() != 0) return false;
1239 log.displayNL ("Display %d pending requests", Requests.size ());
1240 for (uint i = 0 ; i < Requests.size (); i++)
1242 log.displayNL ("id: %d wait: %d recv: %d from: %s nbrow: %d", Requests[i].Id, Requests[i].NbWaiting, Requests[i].NbReceived, Requests[i].From->asString ().c_str (), Requests[i].NbRow);
1244 log.displayNL ("End of display pending requests");
1246 return true;
1249 NLMISC_COMMAND (generateAlert, "generate an alert", "<text>")
1251 if(args.size() != 1) return false;
1253 sendAdminAlert (args[0].c_str());
1255 return true;