6 # $Id: glideinMonitor.py,v 1.21 2011/02/10 21:35:32 parag Exp $
9 # This module implements helper functions
10 # used to perform pseudo-interactive monitoring
13 # The startd must be configured with exactly 2 slots, called vm
14 # It must have cross-vm expressions State and RemoteUser enabled.
15 # It also must advertize that has the monitor vm.
18 # Igor Sfiligoi (May 2007)
29 # This should be done by the user of the module
30 #sys.path.append("../../lib")
35 # returns a dictionary of jid,schedd_name,pool_name, timeout and argv
36 # the argv contains the arguments not parsed by the function
38 outdict
={'schedd_name':None,'pool_name':None,
39 'timeout':130} #default
47 outdict
['schedd_name']=argv
[i
]
50 outdict
['pool_name']=argv
[i
]
53 outdict
['timeout']=int(argv
[i
])
58 # first unknown element
59 # return the rest to the caller
64 raise RuntimeError, 'JID not found'
66 outdict
['argv']=argv
[i
:]
69 # createMonitorFile is a callback with the following arguments:
70 # (monitor_file_name,monitor_control_relname,argv,condor_status,monitorVM):
71 def monitor(jid
,schedd_name
,pool_name
,
73 createMonitorFile
,argv
,
75 stderr_fd
=sys
.stderr
):
77 jid_cluster
,jid_proc
=string
.split(jid
,".",1)
79 raise RuntimeError, 'Invalid JID %s, expected Cluster.Proc'%jid
81 constraint
="(ClusterId=?=%s) && (ProcId=?=%s)"%(jid_cluster
,jid_proc
)
83 remoteVM
=getRemoteVM(pool_name
,schedd_name
,constraint
)
84 monitorVM
=getMonitorVM(pool_name
,remoteVM
)
86 condor_status
=getMonitorVMStatus(pool_name
,monitorVM
)
87 validateMonitorVMStatus(condor_status
,monitorVM
)
89 if condor_status
.has_key('GLEXEC_STARTER'):
90 glexec_starter
=condor_status
['GLEXEC_STARTER']
92 glexec_starter
=False #if not defined, assume no gLExec for old way
94 if condor_status
.has_key('GLEXEC_JOB'):
95 glexec_job
=condor_status
['GLEXEC_JOB']
97 glexec_job
=False #if not defined, assume no gLExec for new way
99 if glexec_starter
or glexec_job
:
100 if not os
.environ
.has_key('X509_USER_PROXY'):
101 raise RuntimeError, "Job running on a gLExec enabled resource; X509_USER_PROXY must be defined"
102 x509_file
=os
.environ
['X509_USER_PROXY']
107 tmpdir
=tempfile
.mkdtemp(prefix
="glidein_intmon_")
109 sname
=os
.path
.join(tmpdir
,"mon.submit")
110 mfname
=os
.path
.join(tmpdir
,"mon.sh")
111 mfout
=os
.path
.join(tmpdir
,"mon.out")
112 mferr
=os
.path
.join(tmpdir
,"mon.err")
113 mlog
=os
.path
.join(tmpdir
,"mon.log")
114 mc_relname
="mon.done"
115 mcname
=os
.path
.join(tmpdir
,mc_relname
)
116 createMonitorFile(mfname
,mc_relname
,argv
,condor_status
,monitorVM
)
117 createSubmitFile(tmpdir
,sname
,mlog
,mfname
,mfout
,mferr
,
118 monitorVM
,timeout
,x509_file
)
119 jid
=condorManager
.condorSubmitOne(sname
,schedd_name
,pool_name
)
121 checkFile(mcname
,schedd_name
,pool_name
,timeout
,reschedule_freq
=10)
122 printFile(mfout
,stdout_fd
)
123 printFile(mferr
,stderr_fd
)
125 condorManager
.condorRemoveOne(jid
,schedd_name
,pool_name
)
128 shutil
.rmtree(tmpdir
)
131 ######## Internal ############
135 def getRemoteVM(pool_name
,schedd_name
,constraint
):
136 cq
=condorMonitor
.CondorQ(schedd_name
=schedd_name
,pool_name
=pool_name
)
137 data
=cq
.fetch(constraint
)
138 if len(data
.keys())==0:
139 raise RuntimeError, "Job not found"
140 if len(data
.keys())>1:
141 raise RuntimeError, "Can handle only one job at a time"
143 if (not el
.has_key('JobStatus')) or (el
['JobStatus']!=2):
144 raise RuntimeError, "Job not running"
145 if not el
.has_key('RemoteHost'):
146 raise RuntimeError, "Job still starting"
148 return el
['RemoteHost']
150 def getMonitorVM(pool_name
,jobVM
):
151 cs
=condorMonitor
.CondorStatus(pool_name
=pool_name
)
152 data
=cs
.fetch(constraint
='(Name=="%s")'%jobVM
,format_list
=[('IS_MONITOR_VM','b'),('HAS_MONITOR_VM','b'),('Monitoring_Name','s')])
153 if not data
.has_key(jobVM
):
154 raise RuntimeError, "Job claims it runs on %s, but cannot find it!"%jobVM
156 if (not job_data
.has_key('HAS_MONITOR_VM')) or (not job_data
.has_key('IS_MONITOR_VM')):
157 raise RuntimeError, "Slot %s does not support monitoring!"%jobVM
158 if not (job_data
['HAS_MONITOR_VM']==True):
159 raise RuntimeError, "Slot %s does not support monitoring! HAS_MONITOR_VM not True."%jobVM
160 if not (job_data
['IS_MONITOR_VM']==False):
161 raise RuntimeError, "Slot %s is a monitoring slot itself! Cannot monitor."%jobVM
162 if not job_data
.has_key('Monitoring_Name'):
163 raise RuntimeError, "Slot %s does not publish the monitoring slot!"%jobVM
165 return job_data
['Monitoring_Name']
167 def getMonitorVMStatus(pool_name
,monitorVM
):
168 cs
=condorMonitor
.CondorStatus(pool_name
=pool_name
)
169 data
=cs
.fetch(constraint
='(Name=="%s")'%monitorVM
,
170 format_list
=[('IS_MONITOR_VM','b'),('HAS_MONITOR_VM','b'),('State','s'),('Activity','s'),('vm2_State','s'),('vm2_Activity','s'),('GLEXEC_STARTER','b'),('USES_MONITOR_STARTD','b'),('GLEXEC_JOB','b')])
171 if not data
.has_key(monitorVM
):
172 raise RuntimeError, "Monitor slot %s does not exist!"%monitorVM
174 return data
[monitorVM
]
176 def validateMonitorVMStatus(condor_status
,monitorVM
):
177 if ((not condor_status
.has_key('HAS_MONITOR_VM')) or
178 (condor_status
['HAS_MONITOR_VM']!=True)):
179 raise RuntimeError, "Monitor slot %s does not allow monitoring"%monitorVM
180 if not (condor_status
['IS_MONITOR_VM']==True):
181 raise RuntimeError, "Slot %s is not a monitoring slot!"%monitorVM
183 # Since we will be queueing anyhow, do not check if it is ready right now
184 #if condor_status['State']=='Claimed':
185 # raise RuntimeError, "Job cannot be monitored right now"
187 if condor_status
['Activity']=='Retiring':
188 raise RuntimeError, "Job cannot be monitored anymore"
190 if condor_status
.has_key('vm2_State'):
191 # only if has vm2_State are cross VM states checked
192 if condor_status
['vm2_State']!='Claimed':
193 raise RuntimeError, "Job cannot be yet monitored"
194 if condor_status
['vm2_Activity']=='Retiring':
195 raise RuntimeError, "Job cannot be monitored anymore"
199 def createSubmitFile(work_dir
,sfile
,mlog
,
201 monitorVM
,timeout
,x509_file
=None):
204 fd
.write("universe=vanilla\n")
205 fd
.write("executable=%s\n"%mfname
)
206 fd
.write("initialdir=%s\n"%work_dir
)
207 fd
.write("output=%s\n"%mfout
)
208 fd
.write("error=%s\n"%mferr
)
209 fd
.write("log=%s\n"%mlog
)
210 fd
.write("transfer_executable=True\n")
211 fd
.write("when_to_transfer_output=ON_EXIT\n")
212 fd
.write("notification=Never\n")
213 fd
.write("+GLIDEIN_Is_Monitor=True\n")
214 fd
.write("+Owner=Undefined\n")
216 fd
.write('x509userproxy = %s\n'%x509_file
)
217 fd
.write('Requirements=(Name=?="%s")&&(Arch=!="Absurd")\n'%monitorVM
)
218 fd
.write("periodic_remove=(CurrentTime>%li)\n"%(long(time
.time())+timeout
+30)) # karakiri after timeout+delta
223 def checkFile(fname
,schedd_name
,pool_name
,
224 timeout
,reschedule_freq
):
225 deadline
=time
.time()+timeout
226 last_reschedule
=time
.time()
227 while (time
.time()<deadline
):
228 if (time
.time()-last_reschedule
)>=reschedule_freq
:
229 condorManager
.condorReschedule(schedd_name
,pool_name
)
230 last_reschedule
=time
.time()
232 if os
.path
.exists(fname
):
234 raise RuntimeError, "Command did not reply within timeout (%ss)"%timeout
236 def printFile(fname
,outfd
):