Merge branch 'main/rendor-staging' into main/atys-live
[ryzomcore.git] / nelns / admin_service / admin_service.cpp
blob1788ff5ef61c0f9905d29cc31189ad62af853de4
1 // NeLNS - MMORPG Framework <http://dev.ryzom.com/projects/nel/>
2 // Copyright (C) 2010 Winch Gate Property Limited
3 //
4 // This source file has been modified by the following contributors:
5 // Copyright (C) 2019 Jan BOON (Kaetemi) <jan.boon@kaetemi.be>
6 //
7 // This program is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Affero General Public License as
9 // published by the Free Software Foundation, either version 3 of the
10 // License, or (at your option) any later version.
12 // This program is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU Affero General Public License for more details.
17 // You should have received a copy of the GNU Affero General Public License
18 // along with this program. If not, see <http://www.gnu.org/licenses/>.
20 #ifdef HAVE_CONFIG_H
21 #include "config.h"
22 #endif // HAVE_CONFIG_H
24 #ifndef NELNS_CONFIG
25 #define NELNS_CONFIG ""
26 #endif // NELNS_CONFIG
28 #ifndef NELNS_LOGS
29 #define NELNS_LOGS ""
30 #endif // NELNS_LOGS
32 #include "nel/misc/types_nl.h"
34 #include <string>
35 #include <list>
37 #ifdef NL_OS_WINDOWS
38 #include <winsock2.h>
39 #include <windows.h>
40 typedef unsigned long ulong;
41 #endif
43 #include <mysql.h>
44 #ifndef LIBMARIADB
45 #include <mysql_version.h>
46 #endif
48 #include "nel/misc/debug.h"
49 #include "nel/misc/config_file.h"
50 #include "nel/misc/path.h"
51 #include "nel/misc/command.h"
53 #include "nel/net/service.h"
54 #include "nel/net/varpath.h"
55 #include "nel/net/email.h"
57 #include "connection_web.h"
61 // Namespaces
64 using namespace std;
65 using namespace NLMISC;
66 using namespace NLNET;
70 // NeL Variables (for config file, etc)
73 // this variable should be used in conjunction with UseExplicitAESRegistration.
74 // the AS / AES registration process works as follows:
75 // - aes creates a layer 5 connection to as
76 // - as gets a serviceUp callback and looks in the database to try to find a match for the AES
77 // - if the match fails then AS sends a reject message to the AES
78 // - when the AES receives the reject message they check their UseExplicitAESRegistration flag - if it's set they
79 // attempt an explicit connection, sending the info required by the AS that would normally come from the database
80 // - when the AS receives an explicit registration, it verifies the state of the AllowExplicitAESRegistration flag
81 // and completes the registration work that failed earlier due to the database access failure
82 CVariable<bool> AllowExplicitAESRegistration("as","AllowExplicitAESRegistration","flag to allow AES services to register explicitly",false,0,true);
84 // this variable allows one to launch an AS on a machine that doesn't have a database setup
85 // the functionality of the AS is reduced (particularly in respect to alarms and graphs which are configured via the database)
86 CVariable<bool> DontUseDataBase("as","DontUseDataBase","if this flag is set calls to the database will be ignored",false,0,true);
90 // Structures
93 struct CRequest
95 CRequest (uint32 id, TSockId from) : Id(id), NbWaiting(0), NbReceived(0), From(from), NbRow(0), NbLines(1)
97 Time = CTime::getSecondsSince1970 ();
100 uint32 Id;
101 uint NbWaiting;
102 uint32 NbReceived;
103 TSockId From;
104 uint32 Time; // when the request was ask
106 uint32 NbRow;
107 uint32 NbLines;
109 vector<vector<string> > Array; // it's the 2 dimensional array that will be send to the php for variables
110 vector<string> Log; // this log contains the answer if a command was asked, othewise, Array contains the results
112 uint32 getVariable(const string &variable)
114 for (uint32 i = 0; i < NbRow; i++)
115 if (Array[i][0] == variable)
116 return i;
118 // need to add the variable
119 vector<string> NewRow;
120 NewRow.resize (NbLines);
121 NewRow[0] = variable;
122 Array.push_back (NewRow);
123 return NbRow++;
126 void addLine ()
128 for (uint32 i = 0; i < NbRow; i++)
129 Array[i].push_back("");
131 NbLines++;
134 void display ()
136 if (Log.empty())
138 nlinfo ("Display answer array for request %d: %d row %d lines", Id, NbRow, NbLines);
139 for (uint i = 0; i < NbLines; i++)
141 for (uint j = 0; j < NbRow; j++)
143 nlassert (Array.size () == NbRow);
144 InfoLog->displayRaw ("%-20s", Array[j][i].c_str());
146 InfoLog->displayRawNL ("");
148 InfoLog->displayRawNL ("End of the array");
150 else
152 nlinfo ("Display the log for request %d: %d lines", Id, Log.size());
153 for (uint i = 0; i < Log.size(); i++)
155 InfoLog->displayRaw ("%s", Log[i].c_str());
157 InfoLog->displayRawNL ("End of the log");
162 struct CAdminExecutorService
164 CAdminExecutorService (const string &shard, const string &name, TServiceId sid) : Shard(shard), SId(sid), Name(name) { }
166 string Shard; /// Name of the shard
167 TServiceId SId; /// uniq number to identify the AES
168 string Name; /// name of the admin executor service
170 vector<uint32> WaitingRequestId; /// contains all request that the server hasn't reply yet
174 typedef list<CAdminExecutorService> TAdminExecutorServices;
175 typedef list<CAdminExecutorService>::iterator AESIT;
179 // Variables
182 TAdminExecutorServices AdminExecutorServices;
184 MYSQL *DatabaseConnection = NULL;
186 vector<CRequest> Requests;
188 uint32 RequestTimeout = 5; // in second
190 // cumulate 5 seconds of alert
191 sint32 AdminAlertAccumlationTime = 5;
195 // Functions
198 AESIT findAES (TServiceId sid, bool asrt = true)
200 AESIT aesit;
201 for (aesit = AdminExecutorServices.begin(); aesit != AdminExecutorServices.end(); aesit++)
202 if ((*aesit).SId == sid)
203 break;
205 if (asrt)
206 nlassert (aesit != AdminExecutorServices.end());
207 return aesit;
210 AESIT findAES (const string &name, bool asrt = true)
212 AESIT aesit;
213 for (aesit = AdminExecutorServices.begin(); aesit != AdminExecutorServices.end(); aesit++)
214 if ((*aesit).Name == name)
215 break;
217 if (asrt)
218 nlassert (aesit != AdminExecutorServices.end());
220 return aesit;
225 // SQL helpers
228 MYSQL_RES *sqlCurrentQueryResult = NULL;
230 MYSQL_ROW sqlQuery (const char *format, ...)
232 if (DontUseDataBase)
233 return 0;
235 char *query;
236 NLMISC_CONVERT_VARGS (query, format, 1024);
238 if (DatabaseConnection == 0)
240 nlwarning ("MYSQL: mysql_query (%s) failed: DatabaseConnection is 0", query);
241 return NULL;
244 int ret = mysql_query (DatabaseConnection, query);
245 if (ret != 0)
247 nlwarning ("MYSQL: mysql_query () failed for query '%s': %s", query, mysql_error(DatabaseConnection));
248 return 0;
251 sqlCurrentQueryResult = mysql_store_result(DatabaseConnection);
252 if (sqlCurrentQueryResult == 0)
254 nlwarning ("MYSQL: mysql_store_result () failed for query '%s': %s", query, mysql_error(DatabaseConnection));
255 return 0;
258 MYSQL_ROW row = mysql_fetch_row(sqlCurrentQueryResult);
259 if (row == 0)
261 nlwarning ("MYSQL: mysql_fetch_row () failed for query '%s': %s", query, mysql_error(DatabaseConnection));
264 nldebug ("MYSQL: sqlQuery(%s) returns %d rows", query, mysql_num_rows(sqlCurrentQueryResult));
266 return row;
269 MYSQL_ROW sqlNextRow ()
271 if (DontUseDataBase)
272 return 0;
274 if (sqlCurrentQueryResult == 0)
275 return 0;
277 return mysql_fetch_row(sqlCurrentQueryResult);
280 void sqlFlushResult()
282 if (DontUseDataBase)
283 return;
285 if (sqlCurrentQueryResult == NULL)
286 return;
288 mysql_free_result(sqlCurrentQueryResult);
289 sqlCurrentQueryResult = NULL;
294 // Admin functions
297 string Email;
298 uint32 FirstEmailTime = 0;
300 void sendAdminAlert (const char *format, ...)
302 char *text;
303 NLMISC_CONVERT_VARGS (text, format, 4096);
305 if (AdminAlertAccumlationTime == -1)
307 // we don't send email so just display a warning
308 nlwarning ("ALERT: %s", text);
310 else
312 if(Email.empty() && FirstEmailTime == 0)
314 Email += text;
315 FirstEmailTime = CTime::getSecondsSince1970();
317 else
319 Email += "\n";
320 Email += text;
322 nldebug ("ALERT: pushing email into queue: %s", text);
326 void updateSendAdminAlert ()
328 if(!Email.empty() && FirstEmailTime != 0 && AdminAlertAccumlationTime >=0 && CTime::getSecondsSince1970() > FirstEmailTime + AdminAlertAccumlationTime)
330 vector<string> lines;
331 explode (Email, string("\n"), lines, true);
333 if (!lines.empty())
336 if (IService::getInstance()->ConfigFile.exists("SysLogPath") && IService::getInstance()->ConfigFile.exists("SysLogParams"))
338 // syslog
339 string param;
340 if (lines.size() > 1)
342 param = "Multiple problems, first is: ";
344 param += lines[0];
345 string res = toString(IService::getInstance()->ConfigFile.getVar("SysLogParams").asString().c_str(), param.c_str());
346 launchProgram(IService::getInstance()->ConfigFile.getVar("SysLogPath").asString(), res);
349 if (IService::getInstance()->ConfigFile.exists("AdminEmail"))
351 // email
352 string subject;
353 if (lines.size() == 1)
355 subject = lines[0];
357 else
359 subject = "Multiple problems";
362 std::string from;
363 if(IService::getInstance()->ConfigFile.exists("AdminEmailFrom"))
364 from = IService::getInstance()->ConfigFile.getVar("AdminEmailFrom").asString();
365 CConfigFile::CVar &var = IService::getInstance()->ConfigFile.getVar("AdminEmail");
366 for (uint i = 0; i < var.size(); i++)
368 if (!sendEmail ("", from, var.asString(i), subject, Email))
370 nlwarning ("Can't send email to '%s'", var.asString(i).c_str());
372 else
374 nlinfo ("ALERT: Sent email to admin %s the subject: %s", var.asString(i).c_str(), subject.c_str());
380 Email = "";
381 FirstEmailTime = 0;
386 static void cbAdminEmail (CMessage &msgin, const std::string &serviceName, TServiceId sid)
388 string str;
389 msgin.serial(str);
390 sendAdminAlert (str.c_str());
393 static void cbGraphUpdate (CMessage &msgin, const std::string &serviceName, TServiceId sid)
395 uint32 CurrentTime;
396 msgin.serial (CurrentTime);
398 while (msgin.getPos() < (sint32)msgin.length())
400 string var, service;
401 sint32 val;
402 msgin.serial (service, var, val);
404 AESIT aesit = findAES (sid);
406 string shard, server;
407 shard = (*aesit).Shard;
408 server = (*aesit).Name;
410 if (!shard.empty() && !server.empty() && !service.empty() && !var.empty())
412 string path = CPath::standardizePath (IService::getInstance()->ConfigFile.getVar("RRDVarPath").asString());
413 string rrdfilename = path + shard+"."+server+"."+service+"."+var+".rrd";
415 string arg;
417 if (!NLMISC::CFile::fileExists(rrdfilename))
419 MYSQL_ROW row = sqlQuery ("select graph_update from variable where path like '%%%s' and graph_update!=0", var.c_str());
420 if (row != NULL)
422 uint32 freq = atoi(row[0]);
423 arg = "create "+rrdfilename+" --step "+toString(freq)+" DS:var:GAUGE:"+toString(freq*2)+":U:U RRA:AVERAGE:0.5:1:1000 RRA:AVERAGE:0.5:10:1000 RRA:AVERAGE:0.5:100:1000";
424 launchProgram(IService::getInstance()->ConfigFile.getVar("RRDToolPath").asString(), arg);
426 else
428 nlwarning ("Can't create the rrd because no graph_update in database");
430 sqlFlushResult();
433 arg = "update " + rrdfilename + " " + toString (CurrentTime) + ":" + toString(val);
434 launchProgram(IService::getInstance()->ConfigFile.getVar("RRDToolPath").asString(), arg);
436 else
438 nlwarning ("Shard server service var val is empty");
445 // Request functions
448 uint32 newRequest (TSockId from)
450 static uint32 NextId = 5461231;
452 Requests.push_back (CRequest(NextId, from));
454 return NextId++;
457 void addRequestWaitingNb (uint32 rid)
459 for (uint i = 0 ; i < Requests.size (); i++)
461 if (Requests[i].Id == rid)
463 Requests[i].NbWaiting++;
464 Requests[i].Time = CTime::getSecondsSince1970 ();
465 return;
468 nlwarning ("REQUEST: Received an answer from an unknown resquest %d (perhaps due to a AS timeout)", rid);
471 void subRequestWaitingNb (uint32 rid)
473 for (uint i = 0 ; i < Requests.size (); i++)
475 if (Requests[i].Id == rid)
477 Requests[i].NbWaiting--;
478 return;
481 nlwarning ("REQUEST: Received an answer from an unknown resquest %d (perhaps due to a AS timeout)", rid);
484 void addRequestReceived (uint32 rid)
486 for (uint i = 0 ; i < Requests.size (); i++)
488 if (Requests[i].Id == rid)
490 Requests[i].NbReceived++;
491 return;
494 nlwarning ("REQUEST: Received an answer from an unknown resquest %d (perhaps due to a AS timeout)", rid);
497 void addRequestAnswer (uint32 rid, const vector<string> &variables, const vector<string> &values)
499 for (uint i = 0 ; i < Requests.size (); i++)
501 Requests[i].addLine ();
502 if (Requests[i].Id == rid)
504 if (!variables.empty() && variables[0]=="__log")
506 nlassert (variables.size() == 1);
508 for (uint j = 0; j < values.size(); j++)
510 Requests[i].Log.push_back (values[j]);
513 else
515 nlassert (variables.size() == values.size ());
516 for (uint j = 0; j < variables.size(); j++)
518 uint32 pos = Requests[i].getVariable (variables[j]);
519 Requests[i].Array[pos][Requests[i].NbLines-1] = values[j];
522 return;
525 nlwarning ("REQUEST: Received an answer from an unknown resquest %d (perhaps due to a AS timeout)", rid);
528 bool emptyRequest (uint32 rid)
530 for (uint i = 0 ; i < Requests.size (); i++)
532 if (Requests[i].Id == rid && Requests[i].NbWaiting != 0)
534 return false;
537 return true;
540 void cleanRequest ()
542 uint32 currentTime = CTime::getSecondsSince1970 ();
544 bool timeout;
546 for (uint i = 0 ; i < Requests.size ();)
548 // the AES doesn't answer quickly
549 timeout = (currentTime >= Requests[i].Time+RequestTimeout);
551 if (Requests[i].NbWaiting <= Requests[i].NbReceived || timeout)
553 // the request is over, send to the php
555 string str;
557 if (timeout)
559 nlwarning ("REQUEST: Request %d timeouted, only %d on %d services have replied", Requests[i].Id, Requests[i].NbReceived, Requests[i].NbWaiting);
562 if (Requests[i].Log.empty())
564 if (Requests[i].NbRow == 0 && timeout)
566 str = "1 ((TIMEOUT))";
568 else
570 str = toString(Requests[i].NbRow) + " ";
571 for (uint k = 0; k < Requests[i].NbLines; k++)
573 for (uint j = 0; j < Requests[i].NbRow; j++)
575 nlassert (Requests[i].Array.size () == Requests[i].NbRow);
576 if (Requests[i].Array[j][k].empty ())
577 str += "??? ";
578 else
580 str += Requests[i].Array[j][k];
581 if (timeout)
582 str += "((TIMEOUT))";
583 str += " ";
589 else
591 for (uint k = 0; k < Requests[i].Log.size(); k++)
593 str += Requests[i].Log[k];
594 if (timeout)
595 str += "((TIMEOUT))";
599 sendString (Requests[i].From, str);
601 // set to 0 to erase it
602 Requests[i].NbWaiting = 0;
605 if (Requests[i].NbWaiting == 0)
607 Requests.erase (Requests.begin ()+i);
609 else
611 i++;
618 // SQL functions
621 void sqlInit ()
623 if (DontUseDataBase)
624 return;
626 MYSQL *db = mysql_init(NULL);
627 if(db == NULL)
629 nlerror ("mysql_init() failed");
632 #if LIBMYSQL_VERSION_ID < 80000
633 my_bool opt = true;
634 #else
635 bool opt = true;
636 #endif
637 if (mysql_options (db, MYSQL_OPT_RECONNECT, &opt))
639 mysql_close(db);
640 DatabaseConnection = 0;
641 nlerror("mysql_options() failed for database connection to '%s'", IService::getInstance()->ConfigFile.getVar("DatabaseHost").asString().c_str());
642 return;
645 DatabaseConnection = mysql_real_connect(db,
646 IService::getInstance()->ConfigFile.getVar("DatabaseHost").asString().c_str(),
647 IService::getInstance()->ConfigFile.getVar("DatabaseLogin").asString().c_str(),
648 IService::getInstance()->ConfigFile.getVar("DatabasePassword").asString().c_str(),
649 IService::getInstance()->ConfigFile.getVar("DatabaseName").asString().c_str(),
650 0,NULL,0);
651 if (DatabaseConnection == NULL || DatabaseConnection != db)
653 nlerror ("mysql_real_connect() failed to '%s' with login '%s' and database name '%s' with %s",
654 IService::getInstance()->ConfigFile.getVar("DatabaseHost").asString().c_str(),
655 IService::getInstance()->ConfigFile.getVar("DatabaseLogin").asString().c_str(),
656 IService::getInstance()->ConfigFile.getVar("DatabaseName").asString().c_str(),
657 (IService::getInstance()->ConfigFile.getVar("DatabasePassword").asString().empty()?"empty password":"password")
661 #if MYSQL_VERSION_ID < 50019
662 opt = true;
663 if (mysql_options (DatabaseConnection, MYSQL_OPT_RECONNECT, &opt))
665 mysql_close(db);
666 DatabaseConnection = 0;
667 nlerror("mysql_options() failed for database connection to '%s'", IService::getInstance()->ConfigFile.getVar("DatabaseHost").asString().c_str());
668 return;
670 #endif
674 ////////////////////////////////////////////////////////////////////////////////////////////////////////
675 ////////////////////////////////////////////////////////////////////////////////////////////////////////
676 ////////////////// CONNECTION TO THE AES ///////////////////////////////////////////////////////////////
677 ////////////////////////////////////////////////////////////////////////////////////////////////////////
678 ////////////////////////////////////////////////////////////////////////////////////////////////////////
680 void sendAESInformation (TServiceId sid)
682 AESIT aesit = findAES (sid);
684 vector<string> information;
686 CMessage msgout("AES_INFO");
689 // send services that should be running on this AES
691 information.clear ();
692 MYSQL_ROW row = sqlQuery ("select name from service where server='%s'", (*aesit).Name.c_str());
693 while (row != NULL)
695 string service = row[0];
696 nlinfo ("Adding '%s' in registered services to AES-%hu", row[0], sid.get());
697 information.push_back (service);
698 row = sqlNextRow ();
700 sqlFlushResult();
701 msgout.serialCont (information);
704 // send variable alarms for services that should running on this AES
706 information.clear ();
707 row = sqlQuery ("select path, error_bound, alarm_order from variable where error_bound!=-1");
708 while (row != NULL)
710 nlinfo ("Adding '%s' '%s' '%s' in alarm to AES-%hu", row[0], row[1], row[2], sid.get());
711 information.push_back (row[0]);
712 information.push_back (row[1]);
713 information.push_back (row[2]);
714 row = sqlNextRow ();
716 sqlFlushResult();
717 msgout.serialCont (information);
720 // send graph update for services that should running on this AES
722 information.clear ();
723 row = sqlQuery ("select path, graph_update from variable where graph_update!=0");
724 while (row != NULL)
726 CVarPath varpath (row[0]);
728 for(uint i = 0; i < varpath.Destination.size(); i++)
730 string a = varpath.Destination[i].first, b = (*aesit).Shard;
731 if(varpath.Destination[i].first == "*" || varpath.Destination[i].first == (*aesit).Shard)
733 CVarPath varpath2 (varpath.Destination[i].second);
735 for(uint j = 0; j < varpath2.Destination.size(); j++)
737 string c = varpath2.Destination[j].first, d = (*aesit).Name;
738 if(varpath2.Destination[j].first == "*" || varpath2.Destination[j].first == (*aesit).Name)
740 nlinfo ("Adding '%s' '%s' in graph to AES-%hu", row[0], row[1], sid.get());
741 information.push_back (row[0]);
742 information.push_back (row[1]);
747 row = sqlNextRow ();
749 sqlFlushResult();
750 msgout.serialCont (information);
752 nlinfo ("Sending all information about %s AES-%hu (hostedservices, alarms,grapupdate)", (*aesit).Name.c_str(), (*aesit).SId.get());
753 CUnifiedNetwork::getInstance ()->send (sid, msgout);
756 void rejectAES(TServiceId sid, const string &res)
758 CMessage msgout("REJECTED");
759 msgout.serial ((string &)res);
760 CUnifiedNetwork::getInstance ()->send (sid, msgout);
763 // i'm connected to a new admin executor service
764 static void cbNewAESConnection (const std::string &serviceName, TServiceId sid, void *arg)
766 TSockId from;
767 CCallbackNetBase *cnb = CUnifiedNetwork::getInstance ()->getNetBase (sid, from);
768 const CInetAddress &ia = cnb->hostAddress (from);
770 AESIT aesit = findAES (sid, false);
772 if (aesit != AdminExecutorServices.end ())
774 nlwarning ("Connection of an AES that is already in the list (%s)", ia.asString ().c_str ());
775 rejectAES (sid, "This AES is already in the AS list");
776 return;
779 MYSQL_ROW row = sqlQuery ("select name from server where address='%s'", ia.ipAddress().c_str());
780 if (row == NULL)
782 if (!AllowExplicitAESRegistration)
784 nlwarning ("Connection of an AES that is not in database server list (%s)", ia.asString ().c_str ());
786 else
788 nlinfo ("Rejecting auto-connection of an AES (%s) - this should provke explicitly reconnect", ia.asString ().c_str ());
790 rejectAES (sid, "This AES is not registered in the database");
791 sqlFlushResult();
792 return;
794 string server = row[0];
795 sqlFlushResult();
797 row = sqlQuery ("select shard from service where server='%s'", server.c_str());
798 if (row == NULL)
800 nlwarning ("Connection of an AES that is not in database server list (%s)", ia.asString ().c_str ());
801 rejectAES (sid, "This AES is not registered in the database");
802 sqlFlushResult();
803 return;
805 string shard = row[0];
806 sqlFlushResult();
808 AdminExecutorServices.push_back (CAdminExecutorService(shard, server, sid));
810 nlinfo ("%s-%hu, server name %s, for shard %s connected and added in the list", serviceName.c_str(), sid.get(), server.c_str(), shard.c_str());
812 // send him services that should run on this server
813 sendAESInformation (sid);
816 // i'm disconnected from an admin executor service
817 static void cbNewAESDisconnection (const std::string &serviceName, TServiceId sid, void *arg)
819 TSockId from;
820 CCallbackNetBase *cnb = CUnifiedNetwork::getInstance ()->getNetBase (sid, from);
821 const CInetAddress &ia = cnb->hostAddress (from);
823 AESIT aesit = findAES (sid, false);
825 if (aesit == AdminExecutorServices.end ())
827 nlwarning ("Disconnection of %s-%hu that is not in my list (%s)", serviceName.c_str (), sid.get(), ia.asString ().c_str ());
828 return;
831 nlinfo ("%s-%hu, shard name %s, disconnected and removed from the list", serviceName.c_str(), sid.get(), (*aesit).Name.c_str ());
833 // we need to remove pending request
835 for(uint i = 0; i < (*aesit).WaitingRequestId.size (); i++)
837 subRequestWaitingNb ((*aesit).WaitingRequestId[i]);
840 AdminExecutorServices.erase (aesit);
843 // we receive an explicit registration message from an AES
844 void cbRegisterAES(CMessage &msgin, const std::string &serviceName, TServiceId sid)
846 if (!AllowExplicitAESRegistration)
848 nlwarning("Ignoring attempted AES registration because AllowExplicitAESRegistration==false");
849 return;
852 string server;
853 string shard;
856 msgin.serial(server);
857 msgin.serial(shard);
859 catch(...)
861 nlwarning("Ignoring attempted AES registration due to execption during message decoding");
862 return;
865 AdminExecutorServices.push_back (CAdminExecutorService(shard, server, sid));
867 nlinfo ("%s-%hu, server name %s, for shard %s connected and added in the list", serviceName.c_str(), sid.get(), server.c_str(), shard.c_str());
869 // send him services that should run on this server
870 sendAESInformation (sid);
873 static void cbView (CMessage &msgin, const std::string &serviceName, TServiceId sid)
875 uint32 rid;
876 msgin.serial (rid);
878 AESIT aesit = findAES (sid);
880 for (uint i = 0; i < (*aesit).WaitingRequestId.size();)
882 if ((*aesit).WaitingRequestId[i] == rid)
884 (*aesit).WaitingRequestId.erase ((*aesit).WaitingRequestId.begin ()+i);
886 else
888 i++;
892 MYSQL_ROW row = sqlQuery ("select distinct shard from service where server='%s'", (*aesit).Name.c_str ());
894 // shard name is find using the "service" table, so, if there s no shard name in it, it returns ???
895 string shardName;
896 if (row != NULL) shardName = row[0];
897 else shardName = DontUseDataBase? aesit->Shard: "???";
899 vector<string> vara, vala;
901 while ((uint32)msgin.getPos() < msgin.length())
903 vara.clear ();
904 vala.clear ();
906 // adding default row
907 vara.push_back ("shard");
908 vara.push_back ("server");
910 vala.push_back (shardName);
911 vala.push_back ((*aesit).Name);
913 uint32 i, nb;
914 string var, val;
916 msgin.serial (nb);
917 for (i = 0; i < nb; i++)
919 msgin.serial (var);
920 if (var == "__log")
922 vara.clear ();
923 vala.clear ();
925 vara.push_back (var);
928 if (vara.size() > 0 && vara[0] == "__log")
929 vala.push_back ("----- Result from Shard "+shardName+" Server "+(*aesit).Name+"\n");
931 msgin.serial (nb);
932 for (i = 0; i < nb; i++)
934 msgin.serial (val);
935 vala.push_back (val);
937 addRequestAnswer (rid, vara, vala);
939 sqlFlushResult();
941 // inc the NbReceived counter
942 addRequestReceived (rid);
945 TUnifiedCallbackItem CallbackArray[] =
947 { "REGISTER_AES", cbRegisterAES },
948 { "VIEW", cbView },
949 { "ADMIN_EMAIL", cbAdminEmail },
950 { "GRAPH_UPDATE", cbGraphUpdate },
954 ////////////////////////////////////////////////////////////////////////////////////////////////////////
955 ////////////////////////////////////////////////////////////////////////////////////////////////////////
956 ////////////////// CONNECTION TO THE CLIENT ////////////////////////////////////////////////////////////
957 ////////////////////////////////////////////////////////////////////////////////////////////////////////
958 ////////////////////////////////////////////////////////////////////////////////////////////////////////
960 void addRequest (const string &rawvarpath, TSockId from)
962 nlinfo ("addRequest from %s: '%s'", from->asString ().c_str (), rawvarpath.c_str ());
964 if(rawvarpath.empty ())
966 // send an empty string to say to php that there's nothing
967 string str;
968 sendString (from, str);
972 // special cases
975 if(rawvarpath == "reload")
977 // it means the we have to resend the list of services managed by AES from the mysql tables
978 for (AESIT aesit = AdminExecutorServices.begin(); aesit != AdminExecutorServices.end(); aesit++)
980 sendAESInformation ((*aesit).SId);
983 // send an empty string to say to php that there's nothing
984 string str;
985 sendString (from, str);
986 return;
990 // normal cases
993 CVarPath varpath (rawvarpath);
995 uint32 rid = newRequest (from);
997 for (uint i = 0; i < varpath.Destination.size (); i++)
999 string shard = varpath.Destination[i].first;
1001 CVarPath subvarpath (varpath.Destination[i].second);
1003 for (uint j = 0; j < subvarpath.Destination.size (); j++)
1005 string server = subvarpath.Destination[j].first;
1007 if (shard == "*" && server == "*")
1009 // Send the request to all online servers of all online shards
1011 AESIT aesit;
1012 for (aesit = AdminExecutorServices.begin(); aesit != AdminExecutorServices.end(); aesit++)
1014 addRequestWaitingNb (rid);
1015 (*aesit).WaitingRequestId.push_back (rid);
1017 CMessage msgout("AES_GET_VIEW");
1018 msgout.serial (rid);
1019 msgout.serial (subvarpath.Destination[j].second);
1020 CUnifiedNetwork::getInstance ()->send ((*aesit).SId, msgout);
1021 nlinfo ("REQUEST: Sent view '%s' to shard name %s 'AES-%hu'", subvarpath.Destination[j].second.c_str(), (*aesit).Name.c_str(), (*aesit).SId.get());
1024 else if (shard == "*" && server == "#")
1026 // Select all shard all server including offline one
1028 MYSQL_ROW row = sqlQuery ("select distinct server, shard from service");
1030 while (row != NULL)
1032 AESIT aesit = findAES (row[0], false);
1034 if (aesit != AdminExecutorServices.end())
1036 addRequestWaitingNb (rid);
1037 (*aesit).WaitingRequestId.push_back (rid);
1039 CMessage msgout("AES_GET_VIEW");
1040 msgout.serial (rid);
1041 msgout.serial (subvarpath.Destination[j].second);
1042 CUnifiedNetwork::getInstance ()->send ((*aesit).SId, msgout);
1043 nlinfo ("REQUEST: Sent view '%s' to shard name %s 'AES-%hu'", subvarpath.Destination[j].second.c_str(), (*aesit).Name.c_str(), (*aesit).SId.get());
1046 else if (server == "#")
1048 vector<string> vara, vala;
1050 // adding default row
1051 vara.push_back ("shard");
1052 vala.push_back (row[1]);
1054 vara.push_back ("server");
1055 vala.push_back (row[0]);
1057 vara.push_back ("service");
1058 vala.push_back ("AES");
1060 vara.push_back ("State");
1061 vala.push_back ("Offline");
1063 addRequestAnswer (rid, vara, vala);
1065 row = sqlNextRow ();
1067 sqlFlushResult();
1069 else if (server == "*" || server == "#")
1071 // Send the request to all online server of a specific shard
1073 MYSQL_ROW row = sqlQuery ("select distinct server from service where shard='%s'", shard.c_str ());
1075 while (row != NULL)
1077 AESIT aesit = findAES (row[0], false);
1079 if (aesit != AdminExecutorServices.end())
1081 addRequestWaitingNb (rid);
1082 (*aesit).WaitingRequestId.push_back (rid);
1084 CMessage msgout("AES_GET_VIEW");
1085 msgout.serial (rid);
1086 msgout.serial (subvarpath.Destination[j].second);
1087 CUnifiedNetwork::getInstance ()->send ((*aesit).SId, msgout);
1088 nlinfo ("REQUEST: Sent view '%s' to shard name %s 'AES-%hu'", subvarpath.Destination[j].second.c_str(), (*aesit).Name.c_str(), (*aesit).SId.get());
1091 else if (server == "#")
1093 vector<string> vara, vala;
1095 // adding default row
1096 vara.push_back ("shard");
1097 vala.push_back (shard);
1099 vara.push_back ("server");
1100 vala.push_back (row[0]);
1102 vara.push_back ("service");
1103 vala.push_back ("AES");
1105 vara.push_back ("State");
1106 vala.push_back ("Offline");
1108 addRequestAnswer (rid, vara, vala);
1110 row = sqlNextRow ();
1113 sqlFlushResult();
1115 else
1117 AESIT aesit = findAES (server, false);
1119 if (aesit != AdminExecutorServices.end())
1121 addRequestWaitingNb (rid);
1122 (*aesit).WaitingRequestId.push_back (rid);
1124 CMessage msgout("AES_GET_VIEW");
1125 msgout.serial (rid);
1126 msgout.serial (subvarpath.Destination[j].second);
1127 CUnifiedNetwork::getInstance ()->send ((*aesit).SId, msgout);
1128 nlinfo ("REQUEST: Sent view '%s' to shard name %s 'AES-%hu'", subvarpath.Destination[j].second.c_str(), (*aesit).Name.c_str(), (*aesit).SId.get());
1130 else
1132 nlwarning ("Server %s is not found in the list", server.c_str ());
1139 static void varRequestTimeout(CConfigFile::CVar &var)
1141 RequestTimeout = var.asInt();
1142 nlinfo ("Request timeout is now after %d seconds", RequestTimeout);
1145 static void varAdminAlertAccumlationTime (CConfigFile::CVar &var)
1147 AdminAlertAccumlationTime = var.asInt();
1151 ////////////////////////////////////////////////////////////////////////////////////////////////////////
1152 ////////////////////////////////////////////////////////////////////////////////////////////////////////
1153 ////////////////// SERVICE IMPLEMENTATION //////////////////////////////////////////////////////////////
1154 ////////////////////////////////////////////////////////////////////////////////////////////////////////
1155 ////////////////////////////////////////////////////////////////////////////////////////////////////////
1157 class CAdminService : public IService
1159 public:
1161 /// Init the service, load the universal time.
1162 void init ()
1164 setDefaultEmailParams (ConfigFile.getVar ("SMTPServer").asString (), ConfigFile.getVar("DefaultEmailFrom").asString(), "");
1166 sqlInit ();
1168 connectionWebInit ();
1170 //CVarPath toto ("[toto");
1172 //CVarPath toto ("*.*.*.*");
1173 //CVarPath toto ("[srv1,srv2].*.*.*");
1174 //CVarPath toto ("[svr1.svc1,srv2.svc2].*.*");
1175 //CVarPath toto ("[svr1.[svc1,svc2].*.var1,srv2.svc2.fe*.var2].toto");
1176 //CVarPath toto ("[svr1.svc1.*.toto,srv2.svc2.*.tata]");
1178 CUnifiedNetwork::getInstance ()->setServiceUpCallback ("AES", cbNewAESConnection);
1179 CUnifiedNetwork::getInstance ()->setServiceDownCallback ("AES", cbNewAESDisconnection);
1181 varRequestTimeout (ConfigFile.getVar ("RequestTimeout"));
1182 ConfigFile.setCallback("RequestTimeout", &varRequestTimeout);
1184 varAdminAlertAccumlationTime (ConfigFile.getVar ("AdminAlertAccumlationTime"));
1185 ConfigFile.setCallback("AdmimAlertAccumlationTime", &varAdminAlertAccumlationTime);
1189 bool update ()
1191 cleanRequest ();
1192 connectionWebUpdate ();
1194 updateSendAdminAlert ();
1195 return true;
1198 void release ()
1200 connectionWebRelease ();
1205 /// Admin Service
1206 NLNET_SERVICE_MAIN (CAdminService, "AS", "admin_service", 49996, CallbackArray, NELNS_CONFIG, NELNS_LOGS);
1209 NLMISC_COMMAND (getViewAS, "send a view and receive an array as result", "<varpath>")
1211 string cmd;
1212 for (uint i = 0; i < args.size(); i++)
1214 if (i != 0) cmd += " ";
1215 cmd += args[i];
1218 addRequest (cmd, NULL);
1220 return true;
1223 NLMISC_COMMAND (clearRequests, "clear all pending requests", "")
1225 if(args.size() != 0) return false;
1227 // for all request, set the NbWaiting to NbReceived, next cleanRequest() will send answer and clear all request
1228 for (uint i = 0 ; i < Requests.size (); i++)
1230 if (Requests[i].NbWaiting <= Requests[i].NbReceived)
1232 Requests[i].NbWaiting = Requests[i].NbReceived;
1236 return true;
1239 NLMISC_COMMAND (displayRequests, "display all pending requests", "")
1241 if(args.size() != 0) return false;
1243 log.displayNL ("Display %d pending requests", Requests.size ());
1244 for (uint i = 0 ; i < Requests.size (); i++)
1246 log.displayNL ("id: %d wait: %d recv: %d from: %s nbrow: %d", Requests[i].Id, Requests[i].NbWaiting, Requests[i].NbReceived, Requests[i].From->asString ().c_str (), Requests[i].NbRow);
1248 log.displayNL ("End of display pending requests");
1250 return true;
1253 NLMISC_COMMAND (generateAlert, "generate an alert", "<text>")
1255 if(args.size() != 1) return false;
1257 sendAdminAlert (args[0].c_str());
1259 return true;