A new doc condor_config.html is being added showing the condor configuration
[burt-test.git] / tools / lib / glideinMonitor.py
blob4cfc6edd501ef5cf267b40db8422d6d6bde80ad9
2 # Project:
3 # glideinWMS
5 # File Version:
6 # $Id: glideinMonitor.py,v 1.21 2011/02/10 21:35:32 parag Exp $
8 # Description:
9 # This module implements helper functions
10 # used to perform pseudo-interactive monitoring
12 # Prerequisites:
13 # The startd must be configured with exactly 2 slots, called vm
14 # It must have cross-vm expressions State and RemoteUser enabled.
15 # It also must advertize that has the monitor vm.
17 # Author:
18 # Igor Sfiligoi (May 2007)
21 import string
22 import time
23 import tempfile
24 import shutil
25 import os
26 import os.path
27 import sys
29 # This should be done by the user of the module
30 #sys.path.append("../../lib")
32 import condorMonitor
33 import condorManager
35 # returns a dictionary of jid,schedd_name,pool_name, timeout and argv
36 # the argv contains the arguments not parsed by the function
37 def parseArgs(argv):
38 outdict={'schedd_name':None,'pool_name':None,
39 'timeout':130} #default
40 jid=None
41 alen=len(argv)
42 i=0
43 while i<alen:
44 ael=argv[i]
45 if ael=='-name':
46 i=i+1
47 outdict['schedd_name']=argv[i]
48 elif ael=='-pool':
49 i=i+1
50 outdict['pool_name']=argv[i]
51 elif ael=='-timeout':
52 i=i+1
53 outdict['timeout']=int(argv[i])
54 else:
55 if jid==None:
56 jid=ael
57 else:
58 # first unknown element
59 # return the rest to the caller
60 break
61 i=i+1
63 if jid==None:
64 raise RuntimeError, 'JID not found'
65 outdict['jid']=jid
66 outdict['argv']=argv[i:]
67 return outdict
69 # createMonitorFile is a callback with the following arguments:
70 # (monitor_file_name,monitor_control_relname,argv,condor_status,monitorVM):
71 def monitor(jid,schedd_name,pool_name,
72 timeout,
73 createMonitorFile,argv,
74 stdout_fd=sys.stdout,
75 stderr_fd=sys.stderr):
76 try:
77 jid_cluster,jid_proc=string.split(jid,".",1)
78 except:
79 raise RuntimeError, 'Invalid JID %s, expected Cluster.Proc'%jid
81 constraint="(ClusterId=?=%s) && (ProcId=?=%s)"%(jid_cluster,jid_proc)
83 remoteVM=getRemoteVM(pool_name,schedd_name,constraint)
84 monitorVM=getMonitorVM(pool_name,remoteVM)
86 condor_status=getMonitorVMStatus(pool_name,monitorVM)
87 validateMonitorVMStatus(condor_status,monitorVM)
89 if condor_status.has_key('GLEXEC_STARTER'):
90 glexec_starter=condor_status['GLEXEC_STARTER']
91 else:
92 glexec_starter=False #if not defined, assume no gLExec for old way
94 if condor_status.has_key('GLEXEC_JOB'):
95 glexec_job=condor_status['GLEXEC_JOB']
96 else:
97 glexec_job=False #if not defined, assume no gLExec for new way
99 if glexec_starter or glexec_job:
100 if not os.environ.has_key('X509_USER_PROXY'):
101 raise RuntimeError, "Job running on a gLExec enabled resource; X509_USER_PROXY must be defined"
102 x509_file=os.environ['X509_USER_PROXY']
103 else:
104 x509_file=None
107 tmpdir=tempfile.mkdtemp(prefix="glidein_intmon_")
108 try:
109 sname=os.path.join(tmpdir,"mon.submit")
110 mfname=os.path.join(tmpdir,"mon.sh")
111 mfout=os.path.join(tmpdir,"mon.out")
112 mferr=os.path.join(tmpdir,"mon.err")
113 mlog=os.path.join(tmpdir,"mon.log")
114 mc_relname="mon.done"
115 mcname=os.path.join(tmpdir,mc_relname)
116 createMonitorFile(mfname,mc_relname,argv,condor_status,monitorVM)
117 createSubmitFile(tmpdir,sname,mlog,mfname,mfout,mferr,
118 monitorVM,timeout,x509_file)
119 jid=condorManager.condorSubmitOne(sname,schedd_name,pool_name)
120 try:
121 checkFile(mcname,schedd_name,pool_name,timeout,reschedule_freq=10)
122 printFile(mfout,stdout_fd)
123 printFile(mferr,stderr_fd)
124 except:
125 condorManager.condorRemoveOne(jid,schedd_name,pool_name)
126 raise
127 finally:
128 shutil.rmtree(tmpdir)
129 return
131 ######## Internal ############
135 def getRemoteVM(pool_name,schedd_name,constraint):
136 cq=condorMonitor.CondorQ(schedd_name=schedd_name,pool_name=pool_name)
137 data=cq.fetch(constraint)
138 if len(data.keys())==0:
139 raise RuntimeError, "Job not found"
140 if len(data.keys())>1:
141 raise RuntimeError, "Can handle only one job at a time"
142 el=data.values()[0]
143 if (not el.has_key('JobStatus')) or (el['JobStatus']!=2):
144 raise RuntimeError, "Job not running"
145 if not el.has_key('RemoteHost'):
146 raise RuntimeError, "Job still starting"
148 return el['RemoteHost']
150 def getMonitorVM(pool_name,jobVM):
151 cs=condorMonitor.CondorStatus(pool_name=pool_name)
152 data=cs.fetch(constraint='(Name=="%s")'%jobVM,format_list=[('IS_MONITOR_VM','b'),('HAS_MONITOR_VM','b'),('Monitoring_Name','s')])
153 if not data.has_key(jobVM):
154 raise RuntimeError, "Job claims it runs on %s, but cannot find it!"%jobVM
155 job_data=data[jobVM]
156 if (not job_data.has_key('HAS_MONITOR_VM')) or (not job_data.has_key('IS_MONITOR_VM')):
157 raise RuntimeError, "Slot %s does not support monitoring!"%jobVM
158 if not (job_data['HAS_MONITOR_VM']==True):
159 raise RuntimeError, "Slot %s does not support monitoring! HAS_MONITOR_VM not True."%jobVM
160 if not (job_data['IS_MONITOR_VM']==False):
161 raise RuntimeError, "Slot %s is a monitoring slot itself! Cannot monitor."%jobVM
162 if not job_data.has_key('Monitoring_Name'):
163 raise RuntimeError, "Slot %s does not publish the monitoring slot!"%jobVM
165 return job_data['Monitoring_Name']
167 def getMonitorVMStatus(pool_name,monitorVM):
168 cs=condorMonitor.CondorStatus(pool_name=pool_name)
169 data=cs.fetch(constraint='(Name=="%s")'%monitorVM,
170 format_list=[('IS_MONITOR_VM','b'),('HAS_MONITOR_VM','b'),('State','s'),('Activity','s'),('vm2_State','s'),('vm2_Activity','s'),('GLEXEC_STARTER','b'),('USES_MONITOR_STARTD','b'),('GLEXEC_JOB','b')])
171 if not data.has_key(monitorVM):
172 raise RuntimeError, "Monitor slot %s does not exist!"%monitorVM
174 return data[monitorVM]
176 def validateMonitorVMStatus(condor_status,monitorVM):
177 if ((not condor_status.has_key('HAS_MONITOR_VM')) or
178 (condor_status['HAS_MONITOR_VM']!=True)):
179 raise RuntimeError, "Monitor slot %s does not allow monitoring"%monitorVM
180 if not (condor_status['IS_MONITOR_VM']==True):
181 raise RuntimeError, "Slot %s is not a monitoring slot!"%monitorVM
183 # Since we will be queueing anyhow, do not check if it is ready right now
184 #if condor_status['State']=='Claimed':
185 # raise RuntimeError, "Job cannot be monitored right now"
187 if condor_status['Activity']=='Retiring':
188 raise RuntimeError, "Job cannot be monitored anymore"
190 if condor_status.has_key('vm2_State'):
191 # only if has vm2_State are cross VM states checked
192 if condor_status['vm2_State']!='Claimed':
193 raise RuntimeError, "Job cannot be yet monitored"
194 if condor_status['vm2_Activity']=='Retiring':
195 raise RuntimeError, "Job cannot be monitored anymore"
197 return
199 def createSubmitFile(work_dir,sfile,mlog,
200 mfname,mfout,mferr,
201 monitorVM,timeout,x509_file=None):
202 fd=open(sfile,"w")
203 try:
204 fd.write("universe=vanilla\n")
205 fd.write("executable=%s\n"%mfname)
206 fd.write("initialdir=%s\n"%work_dir)
207 fd.write("output=%s\n"%mfout)
208 fd.write("error=%s\n"%mferr)
209 fd.write("log=%s\n"%mlog)
210 fd.write("transfer_executable=True\n")
211 fd.write("when_to_transfer_output=ON_EXIT\n")
212 fd.write("notification=Never\n")
213 fd.write("+GLIDEIN_Is_Monitor=True\n")
214 fd.write("+Owner=Undefined\n")
215 if x509_file!=None:
216 fd.write('x509userproxy = %s\n'%x509_file)
217 fd.write('Requirements=(Name=?="%s")&&(Arch=!="Absurd")\n'%monitorVM)
218 fd.write("periodic_remove=(CurrentTime>%li)\n"%(long(time.time())+timeout+30)) # karakiri after timeout+delta
219 fd.write("queue\n")
220 finally:
221 fd.close()
223 def checkFile(fname,schedd_name,pool_name,
224 timeout,reschedule_freq):
225 deadline=time.time()+timeout
226 last_reschedule=time.time()
227 while (time.time()<deadline):
228 if (time.time()-last_reschedule)>=reschedule_freq:
229 condorManager.condorReschedule(schedd_name,pool_name)
230 last_reschedule=time.time()
231 time.sleep(1)
232 if os.path.exists(fname):
233 return True
234 raise RuntimeError, "Command did not reply within timeout (%ss)"%timeout
236 def printFile(fname,outfd):
237 fd=open(fname)
238 try:
239 data=fd.read()
240 outfd.write(data)
241 finally:
242 fd.close()