6 # $Id: glideinFrontendMonitoring.py,v 1.11 2011/02/10 21:35:31 parag Exp $
9 # This module implements the functions needed
10 # to monitor the VO frontend
13 # Igor Sfiligoi (Mar 19th 2009)
17 import re
,time
,copy
,string
,math
,random
,fcntl
18 import xmlFormat
,timeConversion
21 ############################################################
25 ############################################################
27 class MonitoringConfig
:
30 # user should modify if needed
31 self
.rrd_step
=300 #default to 5 minutes
32 self
.rrd_heartbeat
=1800 #default to 30 minutes, should be at least twice the loop time
33 self
.rrd_archives
=[('AVERAGE',0.8,1,740), # max precision, keep 2.5 days
34 ('AVERAGE',0.92,12,740), # 1 h precision, keep for a month (30 days)
35 ('AVERAGE',0.98,144,740) # 12 hour precision, keep for a year
38 # The name of the attribute that identifies the glidein
39 self
.monitor_dir
="monitor/"
41 self
.rrd_obj
=rrdSupport
.rrdSupport()
43 self
.my_name
="Unknown"
45 def write_file(self
,relative_fname
,str):
46 fname
=os
.path
.join(self
.monitor_dir
,relative_fname
)
47 #print "Writing "+fname
48 fd
=open(fname
+".tmp","w")
57 def establish_dir(self
,relative_dname
):
58 dname
=os
.path
.join(self
.monitor_dir
,relative_dname
)
59 if not os
.path
.isdir(dname
):
63 def write_rrd_multi(self
,relative_fname
,ds_type
,time
,val_dict
,min=None,max=None):
65 Create a RRD file, using rrdtool.
67 if self
.rrd_obj
.isDummy():
68 return # nothing to do, no rrd bin no rrd creation
70 for tp
in ((".rrd",self
.rrd_archives
),):
71 rrd_ext
,rrd_archives
=tp
72 fname
=os
.path
.join(self
.monitor_dir
,relative_fname
+rrd_ext
)
73 #print "Writing RRD "+fname
75 if not os
.path
.isfile(fname
):
76 #print "Create RRD "+fname
81 ds_names
=val_dict
.keys()
85 for ds_name
in ds_names
:
86 ds_arr
.append((ds_name
,ds_type
,self
.rrd_heartbeat
,min,max))
87 self
.rrd_obj
.create_rrd_multi(fname
,
88 self
.rrd_step
,rrd_archives
,
91 #print "Updating RRD "+fname
93 self
.rrd_obj
.update_rrd_multi(fname
,time
,val_dict
)
95 print "Failed to update %s"%fname
99 #########################################################################################################################################
103 # This class handles the data obtained from condor_q
105 #########################################################################################################################################
109 self
.data
={'factories':{},'totals':{}}
110 self
.updated
=time
.time()
112 self
.files_updated
=None
113 self
.attributes
={'Jobs':("Idle","OldIdle","Running","Total"),
114 'Glideins':("Idle","Running","Total"),
115 'MatchedJobs':("Idle","EffIdle","OldIdle","Running","RunningHere"),
116 'MatchedGlideins':("Total","Idle","Running"),
117 'Requested':("Idle","MaxRun")}
119 def logJobs(self
,jobs_data
):
121 self
.data
['totals']['Jobs']=el
123 for k
in self
.attributes
['Jobs']:
124 if jobs_data
.has_key(k
):
126 self
.updated
=time
.time()
128 def logGlideins(self
,slots_data
):
130 self
.data
['totals']['Glideins']=el
132 for k
in self
.attributes
['Glideins']:
133 if slots_data
.has_key(k
):
135 self
.updated
=time
.time()
138 def logMatchedJobs(self
, factory
, idle
, effIdle
, oldIdle
, running
, realRunning
):
139 factories
= self
.data
['factories']
140 if not factory
in factories
:
141 factories
[factory
] = {}
143 factories
[factory
]['MatchedJobs'] = {self
.attributes
['MatchedJobs'][0]: idle
,
144 self
.attributes
['MatchedJobs'][1]: effIdle
,
145 self
.attributes
['MatchedJobs'][2]: oldIdle
,
146 self
.attributes
['MatchedJobs'][3]: running
,
147 self
.attributes
['MatchedJobs'][4]: realRunning
150 self
.update
=time
.time()
152 def logFactDown(self
, factory
, isDown
):
153 factories
= self
.data
['factories']
154 if not factory
in factories
:
155 factories
[factory
] = {}
158 factories
[factory
]['Down'] = 'Down'
160 factories
[factory
]['Down'] = 'Up'
162 self
.updated
= time
.time()
164 def logMatchedGlideins(self
, factory
, total
, idle
, running
):
165 factories
= self
.data
['factories']
166 if not factory
in factories
:
167 factories
[factory
] = {}
169 factories
[factory
]['MatchedGlideins'] = {self
.attributes
['MatchedGlideins'][0]: total
,
170 self
.attributes
['MatchedGlideins'][1]: idle
,
171 self
.attributes
['MatchedGlideins'][2]: running
174 self
.update
=time
.time()
176 def logFactAttrs(self
, factory
, attrs
, blacklist
):
177 factories
= self
.data
['factories']
178 if not factory
in factories
:
179 factories
[factory
] = {}
181 factories
[factory
]['Attributes'] = {}
183 if not attr
in blacklist
:
184 factories
[factory
]['Attributes'][attr
] = attrs
[attr
]
186 self
.update
=time
.time()
188 def logFactReq(self
, factory
, reqIdle
, reqMaxRun
, params
):
189 factories
= self
.data
['factories']
190 if not factory
in factories
:
191 factories
[factory
] = {}
194 factories
[factory
]['Requested'] = {self
.attributes
['Requested'][0]: reqIdle
,
195 self
.attributes
['Requested'][1]: reqMaxRun
,
196 'Parameters': copy
.deepcopy(params
)
199 self
.updated
= time
.time()
202 return copy
.deepcopy(self
.data
['factories'])
204 def get_xml_data(self
,indent_tab
=xmlFormat
.DEFAULT_TAB
,leading_tab
=""):
206 return xmlFormat
.dict2string(data
,
207 dict_name
='factories', el_name
='factory',
208 subtypes_params
={"class":{'subclass_params':{'Requested':{'dicts_params':{'Parameters':{'el_name':'Parameter'}}}}}},
209 indent_tab
=indent_tab
,leading_tab
=leading_tab
)
211 #return xmlFormat.class2string(self.data,'<VOFrontendGroupStats>',
212 # indent_tab=indent_tab,leading_tab=leading_tab)
214 def get_updated(self
):
217 def get_xml_updated(self
,indent_tab
=xmlFormat
.DEFAULT_TAB
,leading_tab
=""):
218 xml_updated
={"UTC":{"unixtime":timeConversion
.getSeconds(self
.updated
),
219 "ISO8601":timeConversion
.getISO8601_UTC(self
.updated
),
220 "RFC2822":timeConversion
.getRFC2822_UTC(self
.updated
)},
221 "Local":{"ISO8601":timeConversion
.getISO8601_Local(self
.updated
),
222 "RFC2822":timeConversion
.getRFC2822_Local(self
.updated
),
223 "human":timeConversion
.getHuman(self
.updated
)}}
224 return xmlFormat
.dict2string(xml_updated
,
225 dict_name
="updated",el_name
="timezone",
226 subtypes_params
={"class":{}},
227 indent_tab
=indent_tab
,leading_tab
=leading_tab
)
230 total
={'MatchedJobs':None,'Requested':None,'MatchedGlideins':None}
232 for f
in self
.data
['factories'].keys():
233 fa
=self
.data
['factories'][f
]
235 if total
.has_key(w
): # ignore eventual not supported classes
240 # first one, just copy over
244 if type(el
[a
])==type(1): # copy only numbers
249 if type(el
[a
])==type(1): # consider only numbers
252 # if other frontends did't have this attribute, ignore
253 # if any attribute from prev. frontends are not in the current one, remove from total
255 if not el
.has_key(a
):
257 elif type(el
[a
])!=type(1):
260 for w
in total
.keys():
262 del total
[w
] # remove entry if not defined
264 total
.update(copy
.deepcopy(self
.data
['totals']))
267 def get_xml_total(self
,indent_tab
=xmlFormat
.DEFAULT_TAB
,leading_tab
=""):
268 total
=self
.get_total()
269 return xmlFormat
.class2string(total
,
271 indent_tab
=indent_tab
,leading_tab
=leading_tab
)
274 def write_file(self
):
275 global monitoringConfig
277 if (self
.files_updated
!=None) and ((self
.updated
-self
.files_updated
)<5):
278 # files updated recently, no need to redo it
283 xml_str
=('<?xml version="1.0" encoding="ISO-8859-1"?>\n\n'+
284 '<VOFrontendGroupStats>\n'+
285 self
.get_xml_updated(indent_tab
=xmlFormat
.DEFAULT_TAB
,leading_tab
=xmlFormat
.DEFAULT_TAB
)+"\n"+
286 self
.get_xml_data(indent_tab
=xmlFormat
.DEFAULT_TAB
,leading_tab
=xmlFormat
.DEFAULT_TAB
)+"\n"+
287 self
.get_xml_total(indent_tab
=xmlFormat
.DEFAULT_TAB
,leading_tab
=xmlFormat
.DEFAULT_TAB
)+"\n"+
288 "</VOFrontendGroupStats>\n")
290 monitoringConfig
.write_file("frontend_status.xml",xml_str
)
292 total_el
= self
.get_total()
295 type_strings
={'Jobs':'Jobs','Glideins':'Glidein','MatchedJobs':'MatchJob',
296 'MatchedGlideins':'MatchGlidein','Requested':'Req'}
298 #init, so tha all get created properly
299 for tp
in self
.attributes
.keys():
300 tp_str
=type_strings
[tp
]
301 attributes_tp
=self
.attributes
[tp
]
302 for a
in attributes_tp
:
303 val_dict
["%s%s"%(tp_str
,a
)]=None
308 if not (tp
in self
.attributes
.keys()):
311 tp_str
=type_strings
[tp
]
313 attributes_tp
=self
.attributes
[tp
]
315 fe_el_tp
=total_el
[tp
]
316 for a
in fe_el_tp
.keys():
317 if a
in attributes_tp
:
319 if type(a_el
)!=type({}): # ignore subdictionaries
320 val_dict
["%s%s"%(tp_str
,a
)]=a_el
322 monitoringConfig
.establish_dir("total")
323 monitoringConfig
.write_rrd_multi("total/Status_Attributes",
324 "GAUGE",self
.updated
,val_dict
)
326 self
.files_updated
=self
.updated
329 ########################################################################
334 self
.updated
=time
.time()
336 self
.files_updated
=None
337 self
.attributes
={'Jobs':("Idle","OldIdle","Running","Total"),
338 'Matched':("Idle","OldIdle","Running","Total"),
339 'Requested':("Idle","MaxRun"),
340 'Slots':("Idle","Running","Total")}
343 def logJobs(self
,client_name
,qc_status
):
344 if self
.data
.has_key(client_name
):
345 t_el
=self
.data
[client_name
]
348 self
.data
[client_name
]=t_el
353 status_pairs
=((1,"Idle"), (2,"Running"), (5,"Held"), (1001,"Wait"),(1002,"Pending"),(1010,"StageIn"),(1100,"IdleOther"),(4010,"StageOut"))
354 for p
in status_pairs
:
356 if qc_status
.has_key(nr
):
357 el
[str]=qc_status
[nr
]
360 self
.updated
=time
.time()
362 def logRequest(self
,client_name
,requests
,params
):
364 requests is a dictinary of requests
365 params is a dictinary of parameters
367 At the moment, it looks only for
371 if self
.data
.has_key(client_name
):
372 t_el
=self
.data
[client_name
]
375 self
.data
[client_name
]=t_el
380 if requests
.has_key('IdleGlideins'):
381 el
['Idle']=requests
['IdleGlideins']
382 if requests
.has_key('MaxRunningGlideins'):
383 el
['MaxRun']=requests
['MaxRunningGlideins']
385 el
['Parameters']=copy
.deepcopy(params
)
387 self
.updated
=time
.time()
389 def logClientMonitor(self
,client_name
,client_monitor
,client_internals
):
391 client_monitor is a dictinary of monitoring info
392 client_internals is a dictinary of internals
394 At the moment, it looks only for
402 if self
.data
.has_key(client_name
):
403 t_el
=self
.data
[client_name
]
406 self
.data
[client_name
]=t_el
409 t_el
['ClientMonitor']=el
411 for karr
in (('Idle','JobsIdle'),('Running','JobsRunning'),('GlideinsIdle','GlideIdle'),('GlideinsRunning','GlideRunning'),('GlideinsTotal','GlideTotal')):
413 if client_monitor
.has_key(ck
):
414 el
[ek
]=client_monitor
[ck
]
416 if client_internals
.has_key('LastHeardFrom'):
417 el
['InfoAge']=int(time
.time()-long(client_internals
['LastHeardFrom']))
418 el
['InfoAgeAvgCounter']=1 # used for totals since we need an avg in totals, not absnum
420 self
.updated
=time
.time()
423 data1
=copy
.deepcopy(self
.data
)
424 for f
in data1
.keys():
429 if a
[-10:]=='AvgCounter': # do not publish avgcounter fields... they are internals
434 def get_xml_data(self
,indent_tab
=xmlFormat
.DEFAULT_TAB
,leading_tab
=""):
436 return xmlFormat
.dict2string(data
,
437 dict_name
="frontends",el_name
="frontend",
438 subtypes_params
={"class":{'subclass_params':{'Requested':{'dicts_params':{'Parameters':{'el_name':'Parameter'}}}}}},
439 indent_tab
=indent_tab
,leading_tab
=leading_tab
)
442 total
={'Status':None,'Requested':None,'ClientMonitor':None}
444 for f
in self
.data
.keys():
447 if total
.has_key(w
): # ignore eventual not supported classes
452 # first one, just copy over
456 if type(el
[a
])==type(1): # copy only numbers
461 if type(el
[a
])==type(1): # consider only numbers
464 # if other frontends did't have this attribute, ignore
465 # if any attribute from prev. frontends are not in the current one, remove from total
467 if not el
.has_key(a
):
469 elif type(el
[a
])!=type(1):
472 for w
in total
.keys():
474 del total
[w
] # remove entry if not defined
478 if a
[-10:]=='AvgCounter':
479 # this is an average counter, calc the average of the referred element
480 # like InfoAge=InfoAge/InfoAgeAvgCounter
482 tel
[aorg
]=tel
[aorg
]/tel
[a
]
483 # the avgcount totals are just for internal purposes
488 def get_xml_total(self
,indent_tab
=xmlFormat
.DEFAULT_TAB
,leading_tab
=""):
489 total
=self
.get_total()
490 return xmlFormat
.class2string(total
,
492 indent_tab
=indent_tab
,leading_tab
=leading_tab
)
494 def get_updated(self
):
497 def get_xml_updated(self
,indent_tab
=xmlFormat
.DEFAULT_TAB
,leading_tab
=""):
498 xml_updated
={"UTC":{"unixtime":timeConversion
.getSeconds(self
.updated
),
499 "ISO8601":timeConversion
.getISO8601_UTC(self
.updated
),
500 "RFC2822":timeConversion
.getRFC2822_UTC(self
.updated
)},
501 "Local":{"ISO8601":timeConversion
.getISO8601_Local(self
.updated
),
502 "RFC2822":timeConversion
.getRFC2822_Local(self
.updated
),
503 "human":timeConversion
.getHuman(self
.updated
)}}
504 return xmlFormat
.dict2string(xml_updated
,
505 dict_name
="updated",el_name
="timezone",
506 subtypes_params
={"class":{}},
507 indent_tab
=indent_tab
,leading_tab
=leading_tab
)
510 def write_file(self
):
511 global monitoringConfig
513 if (self
.files_updated
!=None) and ((self
.updated
-self
.files_updated
)<5):
514 # files updated recently, no need to redo it
519 xml_str
=('<?xml version="1.0" encoding="ISO-8859-1"?>\n\n'+
520 '<glideFactoryEntryQStats>\n'+
521 self
.get_xml_updated(indent_tab
=xmlFormat
.DEFAULT_TAB
,leading_tab
=xmlFormat
.DEFAULT_TAB
)+"\n"+
522 self
.get_xml_data(indent_tab
=xmlFormat
.DEFAULT_TAB
,leading_tab
=xmlFormat
.DEFAULT_TAB
)+"\n"+
523 self
.get_xml_total(indent_tab
=xmlFormat
.DEFAULT_TAB
,leading_tab
=xmlFormat
.DEFAULT_TAB
)+"\n"+
524 "</glideFactoryEntryQStats>\n")
525 monitoringConfig
.write_file("schedd_status.xml",xml_str
)
528 total_el
=self
.get_total()
531 type_strings
={'Status':'Status','Requested':'Req','ClientMonitor':'Client'}
532 for fe
in [None]+data
.keys():
533 if fe
==None: # special key == Total
537 fe_dir
="frontend_"+fe
542 #init, so that all get created properly
543 for tp
in self
.attributes
.keys():
544 tp_str
=type_strings
[tp
]
545 attributes_tp
=self
.attributes
[tp
]
546 for a
in attributes_tp
:
547 val_dict
["%s%s"%(tp_str
,a
)]=None
549 monitoringConfig
.establish_dir(fe_dir
)
550 for tp
in fe_el
.keys():
551 # type - Status, Requested or ClientMonitor
552 if not (tp
in self
.attributes
.keys()):
555 tp_str
=type_strings
[tp
]
557 attributes_tp
=self
.attributes
[tp
]
560 for a
in fe_el_tp
.keys():
561 if a
in attributes_tp
:
563 if type(a_el
)!=type({}): # ignore subdictionaries
564 val_dict
["%s%s"%(tp_str
,a
)]=a_el
566 monitoringConfig
.write_rrd_multi("%s/Status_Attributes"%fe_dir
,
567 "GAUGE",self
.updated
,val_dict
)
569 self
.files_updated
=self
.updated
572 ############### P R I V A T E ################
574 ##################################################
575 def tmp2final(fname
):
582 os
.rename(fname
,fname
+"~")
587 os
.rename(fname
+".tmp",fname
)
589 print "Failed renaming %s.tmp into %s"%(fname
,fname
)
593 ##################################################
595 # global configuration of the module
596 monitoringConfig
=MonitoringConfig()