3 # Author: Martin Langhoff <martin.langhoff@remote-learner.net>
7 from __future__
import with_statement
20 " flock_multi [-h] [-v] [-E 200] [-T 201] [-s 20] [-w 60m ] heavy 4 heavyscript \n" \
22 " -w accepts m and h suffixes\n"
25 if isinstance(a
, (int, long)) or re
.match('\d+$', a
):
27 m
= re
.match('(\d+)m$', a
)
29 return (int(m
.group(1)) * 60)
30 m
= re
.match('(\d+)h$', a
)
32 return (int(m
.group(1)) * 60 * 60)
33 sys
.stderr
.write("ERROR: timeout parameter not an integer!\n")
36 def maybe_timeout(timeout
, exitcode
):
38 if timeout
< time
.time():
39 sys
.stderr
.write("ERROR: flock_multi timeout\n")
42 def maybe_remove_qmonfile():
44 if qmonfname
and os
.path
.exists(qmonfname
):
49 traceback
.print_exc(file=sys
.stderr
)
52 # vars overriden from env
53 confdir
= '/mnt/cluster/conf/lock'
54 if 'FLOCK_MULTI_CONF_DIR' in os
.environ
:
55 confdir
= os
.environ
['FLOCK_MULTI_CONF_DIR']
57 lockdir
= '/mnt/cluster/lock'
58 if 'FLOCK_MULTI_DIR' in os
.environ
:
59 lockdir
= os
.environ
['FLOCK_MULTI_DIR']
61 qmondir
= os
.path
.join(lockdir
,'queuemonitor')
62 if 'FLOCK_MULTI_QMON_DIR' in os
.environ
:
63 qmondir
= os
.environ
['FLOCK_MULTI_QMON_DIR']
66 long_opts
= ["help", "verbose", "queuemonitor", "qmon", "conflict-exit-code=",
67 "timeout-exit-code=", "sleeptime=", "wait=", "timeout="]
68 opts
, args
= getopt
.getopt(sys
.argv
[1:], "hvQE:T:s:w:", long_opts
)
69 except getopt
.GetoptError
, e
:
70 sys
.stderr
.write("ERROR: Invalid parameter: %s\n" % e
[0])
71 sys
.stderr
.write(help())
75 sys
.stderr
.write("ERROR: At least 3 parameters needed.\n")
76 sys
.stderr
.write(help())
87 if o
in ("-v", "--verbose") :
89 elif o
in ("-Q", "--queuemonitor", "--qmon") :
91 elif o
in ("-h", "--help"):
94 elif o
in ("-E", "--conflict-exit-code"):
96 elif o
in ("-T", "--timeout-exit-code"):
98 elif o
in ("-s", "--sleeptime"):
100 elif o
in ("-w", "--wait", "--timeout"):
101 timeout
= float(arg_to_secs(a
)) + time
.time()
103 assert False, "unhandled option %s" % o
105 lockname
= args
.pop(0)
106 maxlocks
= int(args
.pop(0))
109 conffile
= os
.path
.join(confdir
, lockname
)
111 if os
.path
.exists(conffile
):
112 tmpval
= int(open(conffile
).read())
115 sys
.stderr
.write("WARNING: Ignoring invalid value in %s\n" % conffile
)
118 print "Using %s maxlocks" % maxlocks
121 hostname
= os
.uname()[1]
125 # cast to have better splay
126 sleeptime
= float(sleeptime
)
132 qmonfname
= os
.path
.join(qmondir
, '%s:%s:%s' % (lockname
, hostname
, mypid
))
133 open(qmonfname
, 'w').close() # "touch"
135 print "qmonfile %s" % qmonfname
138 traceback
.print_exc(file=sys
.stderr
)
141 locks
= range(1, maxlocks
+1)
143 random
.shuffle(locks
)
144 for trylock
in locks
:
145 # print "trying %s" %trylock
146 trylockfn
= os
.path
.join(lockdir
, lockname
+ '.%s' % trylock
)
147 # we open for "append", and only move to truncate the
148 # file if we succeed in getting the flock
149 with
open(trylockfn
, 'a') as fh
:
151 fcntl
.flock(fh
, fcntl
.LOCK_EX|fcntl
.LOCK_NB
)
157 print "Got %s" % trylockfn
159 fh
.write("%s PID: %s epoch: %s %s\n" %
160 (hostname
, mypid
, time
.time(), time
.strftime("%a, %d %b %Y %H:%M:%S +0000", t
)))
161 fh
.write(' '.join(cmd
))
164 maybe_remove_qmonfile()
165 # execute the command requested
166 cmdexit
= subprocess
.call(cmd
)
167 except IOError: # failed to get lock, nonfatal
171 sys
.stderr
.write("ERROR: No such file or directory: %s\n" % cmd
[0])
176 # runs on _all_ exceptions - IOError, OSError and KeyboardInterrupt
177 # truncate flock'd file on completion
187 maybe_timeout(timeout
, timeouterr
)
189 splay
= sleeptime
/ 10
190 actual_sleep
= sleeptime
+ random
.uniform(0 - splay
, splay
)
192 print "Tried all locks - sleeping %s" % actual_sleep
193 time
.sleep(actual_sleep
)
194 maybe_timeout(timeout
, timeouterr
)
196 if __name__
== '__main__':
200 except KeyboardInterrupt: # user hit control-C
202 except Exception: # all "interesting" exceptions, but not SystemExit
203 traceback
.print_exc(file=sys
.stdout
)
206 maybe_remove_qmonfile()