6 # $Id: glideinFrontendLib.py,v 1.35 2011/05/10 00:17:38 sfiligoi Exp $
9 # This module implements the functions needed to keep the
10 # required number of idle glideins
11 # plus other miscelaneous functions
14 # Igor Sfiligoi (Sept 19th 2006)
18 import sets
,string
,math
19 import condorMonitor
,condorExe
23 def __init__(self
,log_dir
,max_days
,min_days
,max_mbs
):
25 self
.activity_log
=logSupport
.DayLogFile(os
.path
.join(log_dir
,"frontend"),"info.log")
26 self
.warning_log
=logSupport
.DayLogFile(os
.path
.join(log_dir
,"frontend"),"err.log")
27 self
.debug_log
=logSupport
.DayLogFile(os
.path
.join(log_dir
,"frontend"),"debug.log")
28 self
.cleanupObj
=logSupport
.DirCleanupWSpace(log_dir
,"(frontend\.[0-9]*\.info\.log)|(frontend\.[0-9]*\.err\.log)|(frontend\.[0-9]*\.debug\.log)",
29 int(max_days
*24*3600),int(min_days
*24*3600),
30 long(max_mbs
*(1024.0*1024.0)),
31 self
.activity_log
,self
.warning_log
)
33 def logActivity(self
,str):
35 self
.activity_log
.write(str)
37 # logging must never throw an exception!
38 self
.logWarning("logActivity failed, was logging: %s"%str
,False)
40 def logWarning(self
,str, log_in_activity
=True):
42 self
.warning_log
.write(str)
44 # logging must throw an exception!
48 self
.logActivity("WARNING: %s"%str
)
50 def logDebug(self
,str):
52 self
.debug_log
.write(str)
54 # logging must never throw an exception!
60 self
.cleanupObj
.cleanup()
62 # logging must never throw an exception!
63 self
.logWarning("log cleanup failed.")
65 # someone needs to initialize this
69 #############################################################################################
72 # Return a dictionary of schedds containing interesting jobs
73 # Each element is a condorQ
75 # If not all the jobs of the schedd has to be considered,
76 # specify the appropriate constraint
78 def getCondorQ(schedd_names
,constraint
=None,format_list
=None):
80 format_list
=condorMonitor
.complete_format_list(format_list
,[('JobStatus','i'),('EnteredCurrentStatus','i'),('ServerTime','i'),('RemoteHost','s')])
81 return getCondorQConstrained(schedd_names
,"(JobStatus=?=1)||(JobStatus=?=2)",constraint
,format_list
)
84 # Return a dictionary of schedds containing idle jobs
85 # Each element is a condorQ
87 # Use the output of getCondorQ
89 def getIdleCondorQ(condorq_dict
):
91 for schedd_name
in condorq_dict
.keys():
92 sq
=condorMonitor
.SubQuery(condorq_dict
[schedd_name
],lambda el
:(el
.has_key('JobStatus') and (el
['JobStatus']==1)))
98 # Return a dictionary of schedds containing running jobs
99 # Each element is a condorQ
101 # Use the output of getCondorQ
103 def getRunningCondorQ(condorq_dict
):
105 for schedd_name
in condorq_dict
.keys():
106 sq
=condorMonitor
.SubQuery(condorq_dict
[schedd_name
],lambda el
:(el
.has_key('JobStatus') and (el
['JobStatus']==2)))
111 def appendRealRunning(condorq_dict
, status_dict
):
112 for schedd_name
in condorq_dict
:
113 condorq
= condorq_dict
[schedd_name
].fetchStored()
118 if condorq
[jid
].has_key('RemoteHost'):
119 remote_host
= condorq
[jid
]['RemoteHost']
121 for collector_name
in status_dict
:
122 condor_status
= status_dict
[collector_name
].fetchStored()
123 if remote_host
in condor_status
:
124 # there is currently no way to get the factory collector from
125 # condor status so this hack grabs the hostname of the schedd
126 schedd
= condor_status
[remote_host
]['GLIDEIN_Schedd'].split('@')
130 # split by : to remove port number if there
131 fact_pool
= schedd
[1].split(':')[0]
133 condorq
[jid
]['RunningOn'] = "%s@%s@%s@%s" % (
134 condor_status
[remote_host
]['GLIDEIN_Entry_Name'],
135 condor_status
[remote_host
]['GLIDEIN_Name'],
136 condor_status
[remote_host
]['GLIDEIN_Factory'],
142 condorq
[jid
]['RunningOn'] = 'UNKNOWN'
145 # Return a dictionary of schedds containing old jobs
146 # Each element is a condorQ
148 # Use the output of getCondorQ
150 def getOldCondorQ(condorq_dict
,min_age
):
152 for schedd_name
in condorq_dict
.keys():
153 sq
=condorMonitor
.SubQuery(condorq_dict
[schedd_name
],lambda el
:(el
.has_key('ServerTime') and el
.has_key('EnteredCurrentStatus') and ((el
['ServerTime']-el
['EnteredCurrentStatus'])>=min_age
)))
159 # Return the number of jobs in the dictionary
160 # Use the output of getCondorQ
162 def countCondorQ(condorq_dict
):
164 for schedd_name
in condorq_dict
.keys():
165 count
+=len(condorq_dict
[schedd_name
].fetchStored())
169 # Return a set of users present in the dictionary
170 # Needs "User" attribute
173 def getCondorQUsers(condorq_dict
):
175 for schedd_name
in condorq_dict
.keys():
176 condorq_data
=condorq_dict
[schedd_name
].fetchStored()
177 for jid
in condorq_data
.keys():
178 job
=condorq_data
[jid
]
179 users_set
.add(job
['User'])
184 # Get the number of jobs that match each glidein
186 # match_obj = compile('(job["MIN_NAME"]<glidein["MIN_NAME"]) && (job["ARCH"]==glidein["ARCH"])',"<string>","eval")
187 # condorq_dict = output of getidlqCondorQ
188 # glidein_dict = output of interface.findGlideins
191 # tuple of 3 elements, where each is a
192 # dictionary of glidein name where elements are number of jobs matching
193 # The first one is a straight match
194 # The second one is the entry proportion based on unique subsets
195 # The third one contains only elements that can only run on this site
197 # A special "glidein name" of (None, None, None) is used for jobs
198 # that don't match any "real glidein name"
200 def countMatch(match_obj
,condorq_dict
,glidein_dict
):
201 out_glidein_counts
={}
202 #new_out_counts: keys are site indexes(numbers),
203 #elements will be the number of real
204 #idle jobs associated with each site
209 for schedd
in condorq_dict
.keys():
210 condorq
=condorq_dict
[schedd
]
211 condorq_data
=condorq
.fetchStored()
212 for jid
in condorq_data
.keys():
218 for glidename
in glidein_dict
:
219 glidein
=glidein_dict
[glidename
]
222 for schedd
in condorq_dict
.keys():
223 condorq
=condorq_dict
[schedd
]
224 condorq_data
=condorq
.fetchStored()
226 for jid
in condorq_data
.keys():
227 job
=condorq_data
[jid
]
233 glidein_count
+=schedd_count
235 list_of_all_jobs
.append(jobs
)
236 out_glidein_counts
[glidename
]=glidein_count
238 (outvals
,range) = uniqueSets(list_of_all_jobs
)
239 count_unmatched
=len(cq_jobs
-range)
241 #unique_to_site: keys are sites, elements are num of unique jobs
243 #each tuple contains ([list of site_indexes],jobs associated with those sites)
244 #this loop necessary to avoid key error
245 for tuple in outvals
:
246 for site_index
in tuple[0]:
247 new_out_counts
[site_index
]=0.0
248 unique_to_site
[site_index
]=0
249 #for every tuple of([site_index],jobs), cycle through each site index
250 #new_out_counts[site_index] is the number of jobs over the number
251 #of indexes, may not be an integer.
252 for tuple in outvals
:
253 for site_index
in tuple[0]:
254 new_out_counts
[site_index
]=new_out_counts
[site_index
]+(1.0*len(tuple[1])/len(tuple[0]))
255 #if the site has jobs unique to it
258 unique_to_site
[temp_sites
.pop()]=len(tuple[1])
259 #create a list of all sites, list_of_sites[site_index]=site
262 for glidename
in glidein_dict
:
263 list_of_sites
.append(0)
264 list_of_sites
[i
]=glidename
268 # new_out_counts to final_out_counts
269 # unique_to_site to final_unique
270 # keys go from site indexes to sites
271 for glidename
in glidein_dict
:
272 final_out_counts
[glidename
]=0
273 final_unique
[glidename
]=0
274 for site_index
in new_out_counts
:
275 site
=list_of_sites
[site_index
]
276 final_out_counts
[site
]=math
.ceil(new_out_counts
[site_index
])
277 final_unique
[site
]=unique_to_site
[site_index
]
279 out_glidein_counts
[(None,None,None)]=count_unmatched
280 final_out_counts
[(None,None,None)]=count_unmatched
281 final_unique
[(None,None,None)]=count_unmatched
282 return (out_glidein_counts
,final_out_counts
,final_unique
)
284 def countRealRunning(match_obj
,condorq_dict
,glidein_dict
):
285 out_glidein_counts
={}
286 for glidename
in glidein_dict
:
287 # split by : to remove port number if there
288 glide_str
= "%s@%s" % (glidename
[1],glidename
[0].split(':')[0])
289 glidein
=glidein_dict
[glidename
]
291 for schedd
in condorq_dict
.keys():
292 condorq
=condorq_dict
[schedd
]
293 condorq_data
=condorq
.fetchStored()
295 for jid
in condorq_data
.keys():
296 job
=condorq_data
[jid
]
297 if eval(match_obj
) and job
['RunningOn'] == glide_str
:
300 glidein_count
+=schedd_count
302 out_glidein_counts
[glidename
]=glidein_count
304 return out_glidein_counts
307 # Convert frontend param expression in a value
309 # expr_obj = compile('glidein["MaxTimeout"]+frontend["MaxTimeout"]+600',"<string>","eval")
310 # frontend = the frontend const parameters
311 # glidein = glidein factory parameters
314 # The evaluated value
315 def evalParamExpr(expr_obj
,frontend
,glidein
):
316 return eval(expr_obj
)
319 # Return a dictionary of collectors containing interesting classads
320 # Each element is a condorStatus
322 # If not all the jobs of the schedd has to be considered,
323 # specify the appropriate constraint
325 def getCondorStatus(collector_names
,constraint
=None,format_list
=None):
326 if format_list
!=None:
327 format_list
=condorMonitor
.complete_format_list(format_list
,[('State','s'),('Activity','s'),('EnteredCurrentState','i'),('EnteredCurrentActivity','i'),('LastHeardFrom','i'),('GLIDEIN_Factory','s'),('GLIDEIN_Name','s'),('GLIDEIN_Entry_Name','s'),('GLIDECLIENT_Name','s'),('GLIDEIN_Schedd','s')])
328 return getCondorStatusConstrained(collector_names
,'(IS_MONITOR_VM=!=True)&&(GLIDEIN_Factory=!=UNDEFINED)&&(GLIDEIN_Name=!=UNDEFINED)&&(GLIDEIN_Entry_Name=!=UNDEFINED)',constraint
,format_list
)
331 # Return a dictionary of collectors containing idle(unclaimed) vms
332 # Each element is a condorStatus
334 # Use the output of getCondorStatus
336 def getIdleCondorStatus(status_dict
):
338 for collector_name
in status_dict
.keys():
339 sq
=condorMonitor
.SubQuery(status_dict
[collector_name
],lambda el
:(el
.has_key('State') and el
.has_key('Activity') and (el
['State']=="Unclaimed") and (el
['Activity']=="Idle")))
341 out
[collector_name
]=sq
345 # Return a dictionary of collectors containing running(claimed) vms
346 # Each element is a condorStatus
348 # Use the output of getCondorStatus
350 def getRunningCondorStatus(status_dict
):
352 for collector_name
in status_dict
.keys():
353 sq
=condorMonitor
.SubQuery(status_dict
[collector_name
],lambda el
:(el
.has_key('State') and el
.has_key('Activity') and (el
['State']=="Claimed") and (el
['Activity'] in ("Busy","Retiring"))))
355 out
[collector_name
]=sq
359 # Return a dictionary of collectors containing idle(unclaimed) vms
360 # Each element is a condorStatus
362 # Use the output of getCondorStatus
364 def getClientCondorStatus(status_dict
,frontend_name
,group_name
,request_name
):
365 client_name_old
="%s@%s.%s"%(request_name
,frontend_name
,group_name
)
366 client_name_new
="%s.%s"%(frontend_name
,group_name
)
368 for collector_name
in status_dict
.keys():
369 sq
=condorMonitor
.SubQuery(status_dict
[collector_name
],lambda el
:(el
.has_key('GLIDECLIENT_Name') and ((el
['GLIDECLIENT_Name']==client_name_old
) or ((el
['GLIDECLIENT_Name']==client_name_new
) and (("%s@%s@%s"%(el
['GLIDEIN_Entry_Name'],el
['GLIDEIN_Name'],el
['GLIDEIN_Factory']))==request_name
)))))
371 out
[collector_name
]=sq
375 # Return the number of vms in the dictionary
376 # Use the output of getCondorStatus
378 def countCondorStatus(status_dict
):
380 for collector_name
in status_dict
.keys():
381 count
+=len(status_dict
[collector_name
].fetchStored())
384 ############################################################
386 # I N T E R N A L - Do not use
388 ############################################################
391 # Return a dictionary of schedds containing jobs of a certain type
392 # Each element is a condorQ
394 # If not all the jobs of the schedd has to be considered,
395 # specify the appropriate additional constraint
397 def getCondorQConstrained(schedd_names
,type_constraint
,constraint
=None,format_list
=None):
399 for schedd
in schedd_names
:
401 log_files
.logWarning("Skipping empty schedd name")
403 condorq
=condorMonitor
.CondorQ(schedd
)
404 full_constraint
=type_constraint
[0:] #make copy
406 full_constraint
="(%s) && (%s)"%(full_constraint
,constraint
)
409 condorq
.load(full_constraint
,format_list
)
410 except condorExe
.ExeError
, e
:
412 log_files
.logWarning("Failed to talk to schedd %s. See debug log for more details."%schedd
)
413 log_files
.logDebug("Failed to talk to schedd %s: %s"%(schedd
, e
))
415 log_files
.logWarning("Failed to talk to schedd. See debug log for more details.")
416 log_files
.logDebug("Failed to talk to schedd: %s"%e)
417 continue # if schedd not found it is equivalent to no jobs in the queue
418 if len(condorq
.fetchStored())>0:
419 out_condorq_dict
[schedd
]=condorq
420 return out_condorq_dict
423 # Return a dictionary of collectors containing classads of a certain kind
424 # Each element is a condorStatus
426 # If not all the jobs of the schedd has to be considered,
427 # specify the appropriate additional constraint
429 def getCondorStatusConstrained(collector_names
,type_constraint
,constraint
=None,format_list
=None):
431 for collector
in collector_names
:
432 status
=condorMonitor
.CondorStatus(pool_name
=collector
)
433 full_constraint
=type_constraint
[0:] #make copy
435 full_constraint
="(%s) && (%s)"%(full_constraint
,constraint
)
438 status
.load(full_constraint
,format_list
)
439 except condorExe
.ExeError
, e
:
441 log_files
.logWarning("Failed to talk to collector %s. See debug log for more details."%collector
)
442 log_files
.logDebug("Failed to talk to collector %s: %s"%(collector
, e
))
444 log_files
.logWarning("Failed to talk to collector. See debug log for more details.")
445 log_files
.logDebug("Failed to talk to collector: %s"%e)
446 continue # if collector not found it is equivalent to no classads
447 if len(status
.fetchStored())>0:
448 out_status_dict
[collector
]=status
449 return out_status_dict
451 #############################################
453 # Extract unique subsets from a list of sets
454 # by Benjamin Hass @ UCSD (working under Igor Sfiligoi)
456 # Input: list of sets
457 # Output: list of (index set, value subset) pairs + a set that is the union of all input sets
460 # [Set([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), Set([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
461 # Set([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20,
462 # 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35]),
463 # Set([11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30])]
465 # ([(Set([2]), Set([32, 33, 34, 35, 31])),
466 # (Set([0, 1, 2]), Set([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])),
467 # (Set([2, 3]), Set([11, 12, 13, 14, 15, 16, 17, 18, 19, 20,
468 # 21, 22, 23, 24, 25, 26, 27, 28, 29, 30]))],
469 # Set([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20,
470 # 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35]))
472 def uniqueSets(in_sets
):
473 #sets is a list of sets
478 new_unique
= sets
.Set()
480 old_unique
= sets
.Set()
482 #make a list of the elements common to i
483 #(current iteration of sets) and the existing
485 for k
in sorted_sets
:
486 #for now, old unique is a set with all elements of
488 old_unique
= old_unique | k
491 common_list
.append(common
)
494 #figure out which elements in i
495 # and which elements in old_uniques are unique
496 for j
in common_list
:
498 old_unique
= old_unique
- j
499 #make a list of all the unique elements in sorted_sets
500 for k
in sorted_sets
:
501 old_unique_list
.append(k
&old_unique
)
504 new
.append(new_unique
)
505 for o
in old_unique_list
:
508 for c
in common_list
:
513 # set with all unique elements
515 for s
in sorted_sets
:
516 sum_set
= sum_set | s
519 sorted_sets
.append(sum_set
)
521 # index_list is a list of lists. Each list corresponds to
522 # an element in sorted_sets, and contains the indexes of
523 # that elements shared elements in the initial list of sets
525 for s
in sorted_sets
:
527 temp_sets
= in_sets
[:]
530 indexes
.append(temp_sets
.index(t
))
531 temp_sets
[temp_sets
.index(t
)]=sets
.Set()
532 index_list
.append(indexes
)
536 for i
in range(len(index_list
)-1): # last one contains all the values
537 outvals
.append((sets
.Set(index_list
[i
]),sorted_sets
[i
]))
538 return (outvals
,sorted_sets
[-1])