A new doc condor_config.html is being added showing the condor configuration
[burt-test.git] / frontend / glideinFrontendMonitoring.py
blob5ddf0a572b9da1c92c02a393840d3afca62958c1
2 # Project:
3 # glideinWMS
5 # File Version:
6 # $Id: glideinFrontendMonitoring.py,v 1.11 2011/02/10 21:35:31 parag Exp $
8 # Description:
9 # This module implements the functions needed
10 # to monitor the VO frontend
12 # Author:
13 # Igor Sfiligoi (Mar 19th 2009)
16 import os,os.path
17 import re,time,copy,string,math,random,fcntl
18 import xmlFormat,timeConversion
19 import rrdSupport
21 ############################################################
23 # Configuration
25 ############################################################
27 class MonitoringConfig:
28 def __init__(self):
29 # set default values
30 # user should modify if needed
31 self.rrd_step=300 #default to 5 minutes
32 self.rrd_heartbeat=1800 #default to 30 minutes, should be at least twice the loop time
33 self.rrd_archives=[('AVERAGE',0.8,1,740), # max precision, keep 2.5 days
34 ('AVERAGE',0.92,12,740), # 1 h precision, keep for a month (30 days)
35 ('AVERAGE',0.98,144,740) # 12 hour precision, keep for a year
38 # The name of the attribute that identifies the glidein
39 self.monitor_dir="monitor/"
41 self.rrd_obj=rrdSupport.rrdSupport()
43 self.my_name="Unknown"
45 def write_file(self,relative_fname,str):
46 fname=os.path.join(self.monitor_dir,relative_fname)
47 #print "Writing "+fname
48 fd=open(fname+".tmp","w")
49 try:
50 fd.write(str+"\n")
51 finally:
52 fd.close()
54 tmp2final(fname)
55 return
57 def establish_dir(self,relative_dname):
58 dname=os.path.join(self.monitor_dir,relative_dname)
59 if not os.path.isdir(dname):
60 os.mkdir(dname)
61 return
63 def write_rrd_multi(self,relative_fname,ds_type,time,val_dict,min=None,max=None):
64 """
65 Create a RRD file, using rrdtool.
66 """
67 if self.rrd_obj.isDummy():
68 return # nothing to do, no rrd bin no rrd creation
70 for tp in ((".rrd",self.rrd_archives),):
71 rrd_ext,rrd_archives=tp
72 fname=os.path.join(self.monitor_dir,relative_fname+rrd_ext)
73 #print "Writing RRD "+fname
75 if not os.path.isfile(fname):
76 #print "Create RRD "+fname
77 if min==None:
78 min='U'
79 if max==None:
80 max='U'
81 ds_names=val_dict.keys()
82 ds_names.sort()
84 ds_arr=[]
85 for ds_name in ds_names:
86 ds_arr.append((ds_name,ds_type,self.rrd_heartbeat,min,max))
87 self.rrd_obj.create_rrd_multi(fname,
88 self.rrd_step,rrd_archives,
89 ds_arr)
91 #print "Updating RRD "+fname
92 try:
93 self.rrd_obj.update_rrd_multi(fname,time,val_dict)
94 except Exception,e:
95 print "Failed to update %s"%fname
96 return
99 #########################################################################################################################################
101 # condorQStats
103 # This class handles the data obtained from condor_q
105 #########################################################################################################################################
107 class groupStats:
108 def __init__(self):
109 self.data={'factories':{},'totals':{}}
110 self.updated=time.time()
112 self.files_updated=None
113 self.attributes={'Jobs':("Idle","OldIdle","Running","Total"),
114 'Glideins':("Idle","Running","Total"),
115 'MatchedJobs':("Idle","EffIdle","OldIdle","Running","RunningHere"),
116 'MatchedGlideins':("Total","Idle","Running"),
117 'Requested':("Idle","MaxRun")}
119 def logJobs(self,jobs_data):
120 el={}
121 self.data['totals']['Jobs']=el
123 for k in self.attributes['Jobs']:
124 if jobs_data.has_key(k):
125 el[k]=jobs_data[k]
126 self.updated=time.time()
128 def logGlideins(self,slots_data):
129 el={}
130 self.data['totals']['Glideins']=el
132 for k in self.attributes['Glideins']:
133 if slots_data.has_key(k):
134 el[k]=slots_data[k]
135 self.updated=time.time()
138 def logMatchedJobs(self, factory, idle, effIdle, oldIdle, running, realRunning):
139 factories = self.data['factories']
140 if not factory in factories:
141 factories[factory] = {}
143 factories[factory]['MatchedJobs'] = {self.attributes['MatchedJobs'][0]: idle,
144 self.attributes['MatchedJobs'][1]: effIdle,
145 self.attributes['MatchedJobs'][2]: oldIdle,
146 self.attributes['MatchedJobs'][3]: running,
147 self.attributes['MatchedJobs'][4]: realRunning
150 self.update=time.time()
152 def logFactDown(self, factory, isDown):
153 factories = self.data['factories']
154 if not factory in factories:
155 factories[factory] = {}
157 if isDown:
158 factories[factory]['Down'] = 'Down'
159 else:
160 factories[factory]['Down'] = 'Up'
162 self.updated = time.time()
164 def logMatchedGlideins(self, factory, total, idle, running):
165 factories = self.data['factories']
166 if not factory in factories:
167 factories[factory] = {}
169 factories[factory]['MatchedGlideins'] = {self.attributes['MatchedGlideins'][0]: total,
170 self.attributes['MatchedGlideins'][1]: idle,
171 self.attributes['MatchedGlideins'][2]: running
174 self.update=time.time()
176 def logFactAttrs(self, factory, attrs, blacklist):
177 factories = self.data['factories']
178 if not factory in factories:
179 factories[factory] = {}
181 factories[factory]['Attributes'] = {}
182 for attr in attrs:
183 if not attr in blacklist:
184 factories[factory]['Attributes'][attr] = attrs[attr]
186 self.update=time.time()
188 def logFactReq(self, factory, reqIdle, reqMaxRun, params):
189 factories = self.data['factories']
190 if not factory in factories:
191 factories[factory] = {}
194 factories[factory]['Requested'] = {self.attributes['Requested'][0]: reqIdle,
195 self.attributes['Requested'][1]: reqMaxRun,
196 'Parameters': copy.deepcopy(params)
199 self.updated = time.time()
201 def get_data(self):
202 return copy.deepcopy(self.data['factories'])
204 def get_xml_data(self,indent_tab=xmlFormat.DEFAULT_TAB,leading_tab=""):
205 data=self.get_data()
206 return xmlFormat.dict2string(data,
207 dict_name='factories', el_name='factory',
208 subtypes_params={"class":{'subclass_params':{'Requested':{'dicts_params':{'Parameters':{'el_name':'Parameter'}}}}}},
209 indent_tab=indent_tab,leading_tab=leading_tab)
211 #return xmlFormat.class2string(self.data,'<VOFrontendGroupStats>',
212 # indent_tab=indent_tab,leading_tab=leading_tab)
214 def get_updated(self):
215 return self.updated
217 def get_xml_updated(self,indent_tab=xmlFormat.DEFAULT_TAB,leading_tab=""):
218 xml_updated={"UTC":{"unixtime":timeConversion.getSeconds(self.updated),
219 "ISO8601":timeConversion.getISO8601_UTC(self.updated),
220 "RFC2822":timeConversion.getRFC2822_UTC(self.updated)},
221 "Local":{"ISO8601":timeConversion.getISO8601_Local(self.updated),
222 "RFC2822":timeConversion.getRFC2822_Local(self.updated),
223 "human":timeConversion.getHuman(self.updated)}}
224 return xmlFormat.dict2string(xml_updated,
225 dict_name="updated",el_name="timezone",
226 subtypes_params={"class":{}},
227 indent_tab=indent_tab,leading_tab=leading_tab)
229 def get_total(self):
230 total={'MatchedJobs':None,'Requested':None,'MatchedGlideins':None}
232 for f in self.data['factories'].keys():
233 fa=self.data['factories'][f]
234 for w in fa.keys():
235 if total.has_key(w): # ignore eventual not supported classes
236 el=fa[w]
237 tel=total[w]
239 if tel==None:
240 # first one, just copy over
241 total[w]={}
242 tel=total[w]
243 for a in el.keys():
244 if type(el[a])==type(1): # copy only numbers
245 tel[a]=el[a]
246 else:
247 # successive, sum
248 for a in el.keys():
249 if type(el[a])==type(1): # consider only numbers
250 if tel.has_key(a):
251 tel[a]+=el[a]
252 # if other frontends did't have this attribute, ignore
253 # if any attribute from prev. frontends are not in the current one, remove from total
254 for a in tel.keys():
255 if not el.has_key(a):
256 del tel[a]
257 elif type(el[a])!=type(1):
258 del tel[a]
260 for w in total.keys():
261 if total[w]==None:
262 del total[w] # remove entry if not defined
264 total.update(copy.deepcopy(self.data['totals']))
265 return total
267 def get_xml_total(self,indent_tab=xmlFormat.DEFAULT_TAB,leading_tab=""):
268 total=self.get_total()
269 return xmlFormat.class2string(total,
270 inst_name="total",
271 indent_tab=indent_tab,leading_tab=leading_tab)
274 def write_file(self):
275 global monitoringConfig
277 if (self.files_updated!=None) and ((self.updated-self.files_updated)<5):
278 # files updated recently, no need to redo it
279 return
282 # write snaphot file
283 xml_str=('<?xml version="1.0" encoding="ISO-8859-1"?>\n\n'+
284 '<VOFrontendGroupStats>\n'+
285 self.get_xml_updated(indent_tab=xmlFormat.DEFAULT_TAB,leading_tab=xmlFormat.DEFAULT_TAB)+"\n"+
286 self.get_xml_data(indent_tab=xmlFormat.DEFAULT_TAB,leading_tab=xmlFormat.DEFAULT_TAB)+"\n"+
287 self.get_xml_total(indent_tab=xmlFormat.DEFAULT_TAB,leading_tab=xmlFormat.DEFAULT_TAB)+"\n"+
288 "</VOFrontendGroupStats>\n")
290 monitoringConfig.write_file("frontend_status.xml",xml_str)
292 total_el = self.get_total()
293 # update RRDs
294 val_dict={}
295 type_strings={'Jobs':'Jobs','Glideins':'Glidein','MatchedJobs':'MatchJob',
296 'MatchedGlideins':'MatchGlidein','Requested':'Req'}
298 #init, so tha all get created properly
299 for tp in self.attributes.keys():
300 tp_str=type_strings[tp]
301 attributes_tp=self.attributes[tp]
302 for a in attributes_tp:
303 val_dict["%s%s"%(tp_str,a)]=None
306 for tp in total_el:
307 # type - Jobs,Slots
308 if not (tp in self.attributes.keys()):
309 continue
311 tp_str=type_strings[tp]
313 attributes_tp=self.attributes[tp]
315 fe_el_tp=total_el[tp]
316 for a in fe_el_tp.keys():
317 if a in attributes_tp:
318 a_el=fe_el_tp[a]
319 if type(a_el)!=type({}): # ignore subdictionaries
320 val_dict["%s%s"%(tp_str,a)]=a_el
322 monitoringConfig.establish_dir("total")
323 monitoringConfig.write_rrd_multi("total/Status_Attributes",
324 "GAUGE",self.updated,val_dict)
326 self.files_updated=self.updated
327 return
329 ########################################################################
331 class factoryStats:
332 def __init__(self):
333 self.data={}
334 self.updated=time.time()
336 self.files_updated=None
337 self.attributes={'Jobs':("Idle","OldIdle","Running","Total"),
338 'Matched':("Idle","OldIdle","Running","Total"),
339 'Requested':("Idle","MaxRun"),
340 'Slots':("Idle","Running","Total")}
343 def logJobs(self,client_name,qc_status):
344 if self.data.has_key(client_name):
345 t_el=self.data[client_name]
346 else:
347 t_el={}
348 self.data[client_name]=t_el
350 el={}
351 t_el['Status']=el
353 status_pairs=((1,"Idle"), (2,"Running"), (5,"Held"), (1001,"Wait"),(1002,"Pending"),(1010,"StageIn"),(1100,"IdleOther"),(4010,"StageOut"))
354 for p in status_pairs:
355 nr,str=p
356 if qc_status.has_key(nr):
357 el[str]=qc_status[nr]
358 else:
359 el[str]=0
360 self.updated=time.time()
362 def logRequest(self,client_name,requests,params):
364 requests is a dictinary of requests
365 params is a dictinary of parameters
367 At the moment, it looks only for
368 'IdleGlideins'
369 'MaxRunningGlideins'
371 if self.data.has_key(client_name):
372 t_el=self.data[client_name]
373 else:
374 t_el={}
375 self.data[client_name]=t_el
377 el={}
378 t_el['Requested']=el
380 if requests.has_key('IdleGlideins'):
381 el['Idle']=requests['IdleGlideins']
382 if requests.has_key('MaxRunningGlideins'):
383 el['MaxRun']=requests['MaxRunningGlideins']
385 el['Parameters']=copy.deepcopy(params)
387 self.updated=time.time()
389 def logClientMonitor(self,client_name,client_monitor,client_internals):
391 client_monitor is a dictinary of monitoring info
392 client_internals is a dictinary of internals
394 At the moment, it looks only for
395 'Idle'
396 'Running'
397 'GlideinsIdle'
398 'GlideinsRunning'
399 'GlideinsTotal'
400 'LastHeardFrom'
402 if self.data.has_key(client_name):
403 t_el=self.data[client_name]
404 else:
405 t_el={}
406 self.data[client_name]=t_el
408 el={}
409 t_el['ClientMonitor']=el
411 for karr in (('Idle','JobsIdle'),('Running','JobsRunning'),('GlideinsIdle','GlideIdle'),('GlideinsRunning','GlideRunning'),('GlideinsTotal','GlideTotal')):
412 ck,ek=karr
413 if client_monitor.has_key(ck):
414 el[ek]=client_monitor[ck]
416 if client_internals.has_key('LastHeardFrom'):
417 el['InfoAge']=int(time.time()-long(client_internals['LastHeardFrom']))
418 el['InfoAgeAvgCounter']=1 # used for totals since we need an avg in totals, not absnum
420 self.updated=time.time()
422 def get_data(self):
423 data1=copy.deepcopy(self.data)
424 for f in data1.keys():
425 fe=data1[f]
426 for w in fe.keys():
427 el=fe[w]
428 for a in el.keys():
429 if a[-10:]=='AvgCounter': # do not publish avgcounter fields... they are internals
430 del el[a]
432 return data1
434 def get_xml_data(self,indent_tab=xmlFormat.DEFAULT_TAB,leading_tab=""):
435 data=self.get_data()
436 return xmlFormat.dict2string(data,
437 dict_name="frontends",el_name="frontend",
438 subtypes_params={"class":{'subclass_params':{'Requested':{'dicts_params':{'Parameters':{'el_name':'Parameter'}}}}}},
439 indent_tab=indent_tab,leading_tab=leading_tab)
441 def get_total(self):
442 total={'Status':None,'Requested':None,'ClientMonitor':None}
444 for f in self.data.keys():
445 fe=self.data[f]
446 for w in fe.keys():
447 if total.has_key(w): # ignore eventual not supported classes
448 el=fe[w]
449 tel=total[w]
451 if tel==None:
452 # first one, just copy over
453 total[w]={}
454 tel=total[w]
455 for a in el.keys():
456 if type(el[a])==type(1): # copy only numbers
457 tel[a]=el[a]
458 else:
459 # successive, sum
460 for a in el.keys():
461 if type(el[a])==type(1): # consider only numbers
462 if tel.has_key(a):
463 tel[a]+=el[a]
464 # if other frontends did't have this attribute, ignore
465 # if any attribute from prev. frontends are not in the current one, remove from total
466 for a in tel.keys():
467 if not el.has_key(a):
468 del tel[a]
469 elif type(el[a])!=type(1):
470 del tel[a]
472 for w in total.keys():
473 if total[w]==None:
474 del total[w] # remove entry if not defined
475 else:
476 tel=total[w]
477 for a in tel.keys():
478 if a[-10:]=='AvgCounter':
479 # this is an average counter, calc the average of the referred element
480 # like InfoAge=InfoAge/InfoAgeAvgCounter
481 aorg=a[:-10]
482 tel[aorg]=tel[aorg]/tel[a]
483 # the avgcount totals are just for internal purposes
484 del tel[a]
486 return total
488 def get_xml_total(self,indent_tab=xmlFormat.DEFAULT_TAB,leading_tab=""):
489 total=self.get_total()
490 return xmlFormat.class2string(total,
491 inst_name="total",
492 indent_tab=indent_tab,leading_tab=leading_tab)
494 def get_updated(self):
495 return self.updated
497 def get_xml_updated(self,indent_tab=xmlFormat.DEFAULT_TAB,leading_tab=""):
498 xml_updated={"UTC":{"unixtime":timeConversion.getSeconds(self.updated),
499 "ISO8601":timeConversion.getISO8601_UTC(self.updated),
500 "RFC2822":timeConversion.getRFC2822_UTC(self.updated)},
501 "Local":{"ISO8601":timeConversion.getISO8601_Local(self.updated),
502 "RFC2822":timeConversion.getRFC2822_Local(self.updated),
503 "human":timeConversion.getHuman(self.updated)}}
504 return xmlFormat.dict2string(xml_updated,
505 dict_name="updated",el_name="timezone",
506 subtypes_params={"class":{}},
507 indent_tab=indent_tab,leading_tab=leading_tab)
510 def write_file(self):
511 global monitoringConfig
513 if (self.files_updated!=None) and ((self.updated-self.files_updated)<5):
514 # files updated recently, no need to redo it
515 return
518 # write snaphot file
519 xml_str=('<?xml version="1.0" encoding="ISO-8859-1"?>\n\n'+
520 '<glideFactoryEntryQStats>\n'+
521 self.get_xml_updated(indent_tab=xmlFormat.DEFAULT_TAB,leading_tab=xmlFormat.DEFAULT_TAB)+"\n"+
522 self.get_xml_data(indent_tab=xmlFormat.DEFAULT_TAB,leading_tab=xmlFormat.DEFAULT_TAB)+"\n"+
523 self.get_xml_total(indent_tab=xmlFormat.DEFAULT_TAB,leading_tab=xmlFormat.DEFAULT_TAB)+"\n"+
524 "</glideFactoryEntryQStats>\n")
525 monitoringConfig.write_file("schedd_status.xml",xml_str)
527 data=self.get_data()
528 total_el=self.get_total()
530 # update RRDs
531 type_strings={'Status':'Status','Requested':'Req','ClientMonitor':'Client'}
532 for fe in [None]+data.keys():
533 if fe==None: # special key == Total
534 fe_dir="total"
535 fe_el=total_el
536 else:
537 fe_dir="frontend_"+fe
538 fe_el=data[fe]
540 val_dict={}
542 #init, so that all get created properly
543 for tp in self.attributes.keys():
544 tp_str=type_strings[tp]
545 attributes_tp=self.attributes[tp]
546 for a in attributes_tp:
547 val_dict["%s%s"%(tp_str,a)]=None
549 monitoringConfig.establish_dir(fe_dir)
550 for tp in fe_el.keys():
551 # type - Status, Requested or ClientMonitor
552 if not (tp in self.attributes.keys()):
553 continue
555 tp_str=type_strings[tp]
557 attributes_tp=self.attributes[tp]
559 fe_el_tp=fe_el[tp]
560 for a in fe_el_tp.keys():
561 if a in attributes_tp:
562 a_el=fe_el_tp[a]
563 if type(a_el)!=type({}): # ignore subdictionaries
564 val_dict["%s%s"%(tp_str,a)]=a_el
566 monitoringConfig.write_rrd_multi("%s/Status_Attributes"%fe_dir,
567 "GAUGE",self.updated,val_dict)
569 self.files_updated=self.updated
570 return
572 ############### P R I V A T E ################
574 ##################################################
575 def tmp2final(fname):
576 try:
577 os.remove(fname+"~")
578 except:
579 pass
581 try:
582 os.rename(fname,fname+"~")
583 except:
584 pass
586 try:
587 os.rename(fname+".tmp",fname)
588 except:
589 print "Failed renaming %s.tmp into %s"%(fname,fname)
590 return
593 ##################################################
595 # global configuration of the module
596 monitoringConfig=MonitoringConfig()