A new doc condor_config.html is being added showing the condor configuration
[burt-test.git] / frontend / glideinFrontendLib.py
blob20af9ca2488588a140f34212cb27fba037bb840c
2 # Project:
3 # glideinWMS
5 # File Version:
6 # $Id: glideinFrontendLib.py,v 1.35 2011/05/10 00:17:38 sfiligoi Exp $
8 # Description:
9 # This module implements the functions needed to keep the
10 # required number of idle glideins
11 # plus other miscelaneous functions
13 # Author:
14 # Igor Sfiligoi (Sept 19th 2006)
17 import os.path
18 import sets,string,math
19 import condorMonitor,condorExe
20 import logSupport
22 class LogFiles:
23 def __init__(self,log_dir,max_days,min_days,max_mbs):
24 self.log_dir=log_dir
25 self.activity_log=logSupport.DayLogFile(os.path.join(log_dir,"frontend"),"info.log")
26 self.warning_log=logSupport.DayLogFile(os.path.join(log_dir,"frontend"),"err.log")
27 self.debug_log=logSupport.DayLogFile(os.path.join(log_dir,"frontend"),"debug.log")
28 self.cleanupObj=logSupport.DirCleanupWSpace(log_dir,"(frontend\.[0-9]*\.info\.log)|(frontend\.[0-9]*\.err\.log)|(frontend\.[0-9]*\.debug\.log)",
29 int(max_days*24*3600),int(min_days*24*3600),
30 long(max_mbs*(1024.0*1024.0)),
31 self.activity_log,self.warning_log)
33 def logActivity(self,str):
34 try:
35 self.activity_log.write(str)
36 except:
37 # logging must never throw an exception!
38 self.logWarning("logActivity failed, was logging: %s"%str,False)
40 def logWarning(self,str, log_in_activity=True):
41 try:
42 self.warning_log.write(str)
43 except:
44 # logging must throw an exception!
45 # silently ignore
46 pass
47 if log_in_activity:
48 self.logActivity("WARNING: %s"%str)
50 def logDebug(self,str):
51 try:
52 self.debug_log.write(str)
53 except:
54 # logging must never throw an exception!
55 # silently ignore
56 pass
58 def cleanup(self):
59 try:
60 self.cleanupObj.cleanup()
61 except:
62 # logging must never throw an exception!
63 self.logWarning("log cleanup failed.")
65 # someone needs to initialize this
66 # type LogFiles
67 log_files=None
69 #############################################################################################
72 # Return a dictionary of schedds containing interesting jobs
73 # Each element is a condorQ
75 # If not all the jobs of the schedd has to be considered,
76 # specify the appropriate constraint
78 def getCondorQ(schedd_names,constraint=None,format_list=None):
79 if format_list!=None:
80 format_list=condorMonitor.complete_format_list(format_list,[('JobStatus','i'),('EnteredCurrentStatus','i'),('ServerTime','i'),('RemoteHost','s')])
81 return getCondorQConstrained(schedd_names,"(JobStatus=?=1)||(JobStatus=?=2)",constraint,format_list)
84 # Return a dictionary of schedds containing idle jobs
85 # Each element is a condorQ
87 # Use the output of getCondorQ
89 def getIdleCondorQ(condorq_dict):
90 out={}
91 for schedd_name in condorq_dict.keys():
92 sq=condorMonitor.SubQuery(condorq_dict[schedd_name],lambda el:(el.has_key('JobStatus') and (el['JobStatus']==1)))
93 sq.load()
94 out[schedd_name]=sq
95 return out
98 # Return a dictionary of schedds containing running jobs
99 # Each element is a condorQ
101 # Use the output of getCondorQ
103 def getRunningCondorQ(condorq_dict):
104 out={}
105 for schedd_name in condorq_dict.keys():
106 sq=condorMonitor.SubQuery(condorq_dict[schedd_name],lambda el:(el.has_key('JobStatus') and (el['JobStatus']==2)))
107 sq.load()
108 out[schedd_name]=sq
109 return out
111 def appendRealRunning(condorq_dict, status_dict):
112 for schedd_name in condorq_dict:
113 condorq = condorq_dict[schedd_name].fetchStored()
115 for jid in condorq:
116 found = False
118 if condorq[jid].has_key('RemoteHost'):
119 remote_host = condorq[jid]['RemoteHost']
121 for collector_name in status_dict:
122 condor_status = status_dict[collector_name].fetchStored()
123 if remote_host in condor_status:
124 # there is currently no way to get the factory collector from
125 # condor status so this hack grabs the hostname of the schedd
126 schedd = condor_status[remote_host]['GLIDEIN_Schedd'].split('@')
127 if len(schedd) < 2:
128 break
130 # split by : to remove port number if there
131 fact_pool = schedd[1].split(':')[0]
133 condorq[jid]['RunningOn'] = "%s@%s@%s@%s" % (
134 condor_status[remote_host]['GLIDEIN_Entry_Name'],
135 condor_status[remote_host]['GLIDEIN_Name'],
136 condor_status[remote_host]['GLIDEIN_Factory'],
137 fact_pool)
138 found = True
139 break
141 if not found:
142 condorq[jid]['RunningOn'] = 'UNKNOWN'
145 # Return a dictionary of schedds containing old jobs
146 # Each element is a condorQ
148 # Use the output of getCondorQ
150 def getOldCondorQ(condorq_dict,min_age):
151 out={}
152 for schedd_name in condorq_dict.keys():
153 sq=condorMonitor.SubQuery(condorq_dict[schedd_name],lambda el:(el.has_key('ServerTime') and el.has_key('EnteredCurrentStatus') and ((el['ServerTime']-el['EnteredCurrentStatus'])>=min_age)))
154 sq.load()
155 out[schedd_name]=sq
156 return out
159 # Return the number of jobs in the dictionary
160 # Use the output of getCondorQ
162 def countCondorQ(condorq_dict):
163 count=0
164 for schedd_name in condorq_dict.keys():
165 count+=len(condorq_dict[schedd_name].fetchStored())
166 return count
169 # Return a set of users present in the dictionary
170 # Needs "User" attribute
173 def getCondorQUsers(condorq_dict):
174 users_set=sets.Set()
175 for schedd_name in condorq_dict.keys():
176 condorq_data=condorq_dict[schedd_name].fetchStored()
177 for jid in condorq_data.keys():
178 job=condorq_data[jid]
179 users_set.add(job['User'])
181 return users_set
184 # Get the number of jobs that match each glidein
186 # match_obj = compile('(job["MIN_NAME"]<glidein["MIN_NAME"]) && (job["ARCH"]==glidein["ARCH"])',"<string>","eval")
187 # condorq_dict = output of getidlqCondorQ
188 # glidein_dict = output of interface.findGlideins
190 # Returns:
191 # tuple of 3 elements, where each is a
192 # dictionary of glidein name where elements are number of jobs matching
193 # The first one is a straight match
194 # The second one is the entry proportion based on unique subsets
195 # The third one contains only elements that can only run on this site
197 # A special "glidein name" of (None, None, None) is used for jobs
198 # that don't match any "real glidein name"
200 def countMatch(match_obj,condorq_dict,glidein_dict):
201 out_glidein_counts={}
202 #new_out_counts: keys are site indexes(numbers),
203 #elements will be the number of real
204 #idle jobs associated with each site
205 new_out_counts={}
206 glideindex=0
208 cq_jobs=sets.Set()
209 for schedd in condorq_dict.keys():
210 condorq=condorq_dict[schedd]
211 condorq_data=condorq.fetchStored()
212 for jid in condorq_data.keys():
213 t=(schedd,jid)
214 cq_jobs.add(t)
216 list_of_all_jobs=[]
218 for glidename in glidein_dict:
219 glidein=glidein_dict[glidename]
220 glidein_count=0
221 jobs=sets.Set()
222 for schedd in condorq_dict.keys():
223 condorq=condorq_dict[schedd]
224 condorq_data=condorq.fetchStored()
225 schedd_count=0
226 for jid in condorq_data.keys():
227 job=condorq_data[jid]
228 if eval(match_obj):
229 t=(schedd,jid)
230 jobs.add(t)
231 schedd_count+=1
232 pass
233 glidein_count+=schedd_count
234 pass
235 list_of_all_jobs.append(jobs)
236 out_glidein_counts[glidename]=glidein_count
237 pass
238 (outvals,range) = uniqueSets(list_of_all_jobs)
239 count_unmatched=len(cq_jobs-range)
241 #unique_to_site: keys are sites, elements are num of unique jobs
242 unique_to_site = {}
243 #each tuple contains ([list of site_indexes],jobs associated with those sites)
244 #this loop necessary to avoid key error
245 for tuple in outvals:
246 for site_index in tuple[0]:
247 new_out_counts[site_index]=0.0
248 unique_to_site[site_index]=0
249 #for every tuple of([site_index],jobs), cycle through each site index
250 #new_out_counts[site_index] is the number of jobs over the number
251 #of indexes, may not be an integer.
252 for tuple in outvals:
253 for site_index in tuple[0]:
254 new_out_counts[site_index]=new_out_counts[site_index]+(1.0*len(tuple[1])/len(tuple[0]))
255 #if the site has jobs unique to it
256 if len(tuple[0])==1:
257 temp_sites=tuple[0]
258 unique_to_site[temp_sites.pop()]=len(tuple[1])
259 #create a list of all sites, list_of_sites[site_index]=site
260 list_of_sites=[]
262 for glidename in glidein_dict:
263 list_of_sites.append(0)
264 list_of_sites[i]=glidename
265 i=i+1
266 final_out_counts={}
267 final_unique={}
268 # new_out_counts to final_out_counts
269 # unique_to_site to final_unique
270 # keys go from site indexes to sites
271 for glidename in glidein_dict:
272 final_out_counts[glidename]=0
273 final_unique[glidename]=0
274 for site_index in new_out_counts:
275 site=list_of_sites[site_index]
276 final_out_counts[site]=math.ceil(new_out_counts[site_index])
277 final_unique[site]=unique_to_site[site_index]
279 out_glidein_counts[(None,None,None)]=count_unmatched
280 final_out_counts[(None,None,None)]=count_unmatched
281 final_unique[(None,None,None)]=count_unmatched
282 return (out_glidein_counts,final_out_counts,final_unique)
284 def countRealRunning(match_obj,condorq_dict,glidein_dict):
285 out_glidein_counts={}
286 for glidename in glidein_dict:
287 # split by : to remove port number if there
288 glide_str = "%s@%s" % (glidename[1],glidename[0].split(':')[0])
289 glidein=glidein_dict[glidename]
290 glidein_count=0
291 for schedd in condorq_dict.keys():
292 condorq=condorq_dict[schedd]
293 condorq_data=condorq.fetchStored()
294 schedd_count=0
295 for jid in condorq_data.keys():
296 job=condorq_data[jid]
297 if eval(match_obj) and job['RunningOn'] == glide_str:
298 schedd_count+=1
299 pass
300 glidein_count+=schedd_count
301 pass
302 out_glidein_counts[glidename]=glidein_count
303 pass
304 return out_glidein_counts
307 # Convert frontend param expression in a value
309 # expr_obj = compile('glidein["MaxTimeout"]+frontend["MaxTimeout"]+600',"<string>","eval")
310 # frontend = the frontend const parameters
311 # glidein = glidein factory parameters
313 # Returns:
314 # The evaluated value
315 def evalParamExpr(expr_obj,frontend,glidein):
316 return eval(expr_obj)
319 # Return a dictionary of collectors containing interesting classads
320 # Each element is a condorStatus
322 # If not all the jobs of the schedd has to be considered,
323 # specify the appropriate constraint
325 def getCondorStatus(collector_names,constraint=None,format_list=None):
326 if format_list!=None:
327 format_list=condorMonitor.complete_format_list(format_list,[('State','s'),('Activity','s'),('EnteredCurrentState','i'),('EnteredCurrentActivity','i'),('LastHeardFrom','i'),('GLIDEIN_Factory','s'),('GLIDEIN_Name','s'),('GLIDEIN_Entry_Name','s'),('GLIDECLIENT_Name','s'),('GLIDEIN_Schedd','s')])
328 return getCondorStatusConstrained(collector_names,'(IS_MONITOR_VM=!=True)&&(GLIDEIN_Factory=!=UNDEFINED)&&(GLIDEIN_Name=!=UNDEFINED)&&(GLIDEIN_Entry_Name=!=UNDEFINED)',constraint,format_list)
331 # Return a dictionary of collectors containing idle(unclaimed) vms
332 # Each element is a condorStatus
334 # Use the output of getCondorStatus
336 def getIdleCondorStatus(status_dict):
337 out={}
338 for collector_name in status_dict.keys():
339 sq=condorMonitor.SubQuery(status_dict[collector_name],lambda el:(el.has_key('State') and el.has_key('Activity') and (el['State']=="Unclaimed") and (el['Activity']=="Idle")))
340 sq.load()
341 out[collector_name]=sq
342 return out
345 # Return a dictionary of collectors containing running(claimed) vms
346 # Each element is a condorStatus
348 # Use the output of getCondorStatus
350 def getRunningCondorStatus(status_dict):
351 out={}
352 for collector_name in status_dict.keys():
353 sq=condorMonitor.SubQuery(status_dict[collector_name],lambda el:(el.has_key('State') and el.has_key('Activity') and (el['State']=="Claimed") and (el['Activity'] in ("Busy","Retiring"))))
354 sq.load()
355 out[collector_name]=sq
356 return out
359 # Return a dictionary of collectors containing idle(unclaimed) vms
360 # Each element is a condorStatus
362 # Use the output of getCondorStatus
364 def getClientCondorStatus(status_dict,frontend_name,group_name,request_name):
365 client_name_old="%s@%s.%s"%(request_name,frontend_name,group_name)
366 client_name_new="%s.%s"%(frontend_name,group_name)
367 out={}
368 for collector_name in status_dict.keys():
369 sq=condorMonitor.SubQuery(status_dict[collector_name],lambda el:(el.has_key('GLIDECLIENT_Name') and ((el['GLIDECLIENT_Name']==client_name_old) or ((el['GLIDECLIENT_Name']==client_name_new) and (("%s@%s@%s"%(el['GLIDEIN_Entry_Name'],el['GLIDEIN_Name'],el['GLIDEIN_Factory']))==request_name)))))
370 sq.load()
371 out[collector_name]=sq
372 return out
375 # Return the number of vms in the dictionary
376 # Use the output of getCondorStatus
378 def countCondorStatus(status_dict):
379 count=0
380 for collector_name in status_dict.keys():
381 count+=len(status_dict[collector_name].fetchStored())
382 return count
384 ############################################################
386 # I N T E R N A L - Do not use
388 ############################################################
391 # Return a dictionary of schedds containing jobs of a certain type
392 # Each element is a condorQ
394 # If not all the jobs of the schedd has to be considered,
395 # specify the appropriate additional constraint
397 def getCondorQConstrained(schedd_names,type_constraint,constraint=None,format_list=None):
398 out_condorq_dict={}
399 for schedd in schedd_names:
400 if schedd=='':
401 log_files.logWarning("Skipping empty schedd name")
402 continue
403 condorq=condorMonitor.CondorQ(schedd)
404 full_constraint=type_constraint[0:] #make copy
405 if constraint!=None:
406 full_constraint="(%s) && (%s)"%(full_constraint,constraint)
408 try:
409 condorq.load(full_constraint,format_list)
410 except condorExe.ExeError, e:
411 if schedd!=None:
412 log_files.logWarning("Failed to talk to schedd %s. See debug log for more details."%schedd)
413 log_files.logDebug("Failed to talk to schedd %s: %s"%(schedd, e))
414 else:
415 log_files.logWarning("Failed to talk to schedd. See debug log for more details.")
416 log_files.logDebug("Failed to talk to schedd: %s"%e)
417 continue # if schedd not found it is equivalent to no jobs in the queue
418 if len(condorq.fetchStored())>0:
419 out_condorq_dict[schedd]=condorq
420 return out_condorq_dict
423 # Return a dictionary of collectors containing classads of a certain kind
424 # Each element is a condorStatus
426 # If not all the jobs of the schedd has to be considered,
427 # specify the appropriate additional constraint
429 def getCondorStatusConstrained(collector_names,type_constraint,constraint=None,format_list=None):
430 out_status_dict={}
431 for collector in collector_names:
432 status=condorMonitor.CondorStatus(pool_name=collector)
433 full_constraint=type_constraint[0:] #make copy
434 if constraint!=None:
435 full_constraint="(%s) && (%s)"%(full_constraint,constraint)
437 try:
438 status.load(full_constraint,format_list)
439 except condorExe.ExeError, e:
440 if collector!=None:
441 log_files.logWarning("Failed to talk to collector %s. See debug log for more details."%collector)
442 log_files.logDebug("Failed to talk to collector %s: %s"%(collector, e))
443 else:
444 log_files.logWarning("Failed to talk to collector. See debug log for more details.")
445 log_files.logDebug("Failed to talk to collector: %s"%e)
446 continue # if collector not found it is equivalent to no classads
447 if len(status.fetchStored())>0:
448 out_status_dict[collector]=status
449 return out_status_dict
451 #############################################
453 # Extract unique subsets from a list of sets
454 # by Benjamin Hass @ UCSD (working under Igor Sfiligoi)
456 # Input: list of sets
457 # Output: list of (index set, value subset) pairs + a set that is the union of all input sets
459 # Example in:
460 # [Set([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), Set([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
461 # Set([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20,
462 # 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35]),
463 # Set([11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30])]
464 # Example out:
465 # ([(Set([2]), Set([32, 33, 34, 35, 31])),
466 # (Set([0, 1, 2]), Set([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
467 # (Set([2, 3]), Set([11, 12, 13, 14, 15, 16, 17, 18, 19, 20,
468 # 21, 22, 23, 24, 25, 26, 27, 28, 29, 30]))],
469 # Set([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20,
470 # 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35]))
472 def uniqueSets(in_sets):
473 #sets is a list of sets
474 sorted_sets=[]
475 for i in in_sets:
476 common_list = []
477 common = sets.Set()
478 new_unique = sets.Set()
479 old_unique_list = []
480 old_unique = sets.Set()
481 new = []
482 #make a list of the elements common to i
483 #(current iteration of sets) and the existing
484 #sorted sets
485 for k in sorted_sets:
486 #for now, old unique is a set with all elements of
487 #sorted_sets
488 old_unique = old_unique | k
489 common = k&i
490 if common:
491 common_list.append(common)
492 else:
493 pass
494 #figure out which elements in i
495 # and which elements in old_uniques are unique
496 for j in common_list:
497 i = i - j
498 old_unique = old_unique - j
499 #make a list of all the unique elements in sorted_sets
500 for k in sorted_sets:
501 old_unique_list.append(k&old_unique)
502 new_unique=i
503 if new_unique:
504 new.append(new_unique)
505 for o in old_unique_list:
506 if o:
507 new.append(o)
508 for c in common_list:
509 if c:
510 new.append(c)
511 sorted_sets=new
513 # set with all unique elements
514 sum_set = sets.Set()
515 for s in sorted_sets:
516 sum_set = sum_set | s
519 sorted_sets.append(sum_set)
521 # index_list is a list of lists. Each list corresponds to
522 # an element in sorted_sets, and contains the indexes of
523 # that elements shared elements in the initial list of sets
524 index_list = []
525 for s in sorted_sets:
526 indexes = []
527 temp_sets = in_sets[:]
528 for t in temp_sets:
529 if s & t:
530 indexes.append(temp_sets.index(t))
531 temp_sets[temp_sets.index(t)]=sets.Set()
532 index_list.append(indexes)
534 # create output
535 outvals=[]
536 for i in range(len(index_list)-1): # last one contains all the values
537 outvals.append((sets.Set(index_list[i]),sorted_sets[i]))
538 return (outvals,sorted_sets[-1])