Force a non-lazy run if global.ini has changed
[puppet-git.git] / flock_multi
blobf401279a29bf8e736b1f99595b66127db88f4598
1 #!/usr/bin/env python
3 # Author: Martin Langhoff <martin.langhoff@remote-learner.net>
4 # License: GPLv2
7 from __future__ import with_statement
9 import os, sys
10 import traceback
11 import getopt
12 import fcntl
13 import re
14 import random
15 import subprocess
16 import time
18 def help():
19 return "Usage:\n" \
20 " flock_multi [-h] [-v] [-E 200] [-T 201] [-s 20] [-w 60m ] heavy 4 heavyscript \n" \
21 "Notes: \n" \
22 " -w accepts m and h suffixes\n"
24 def arg_to_secs(a):
25 if isinstance(a, (int, long)) or re.match('\d+$', a):
26 return int(a)
27 m = re.match('(\d+)m$', a)
28 if m:
29 return (int(m.group(1)) * 60)
30 m = re.match('(\d+)h$', a)
31 if m:
32 return (int(m.group(1)) * 60 * 60)
33 sys.stderr.write("ERROR: timeout parameter not an integer!\n")
34 sys.exit(flockerr)
36 def maybe_timeout(timeout, exitcode):
37 if timeout > 0:
38 if timeout < time.time():
39 sys.stderr.write("ERROR: flock_multi timeout\n")
40 sys.exit(exitcode)
42 def maybe_remove_qmonfile():
43 global qmonfname
44 if qmonfname and os.path.exists(qmonfname):
45 try:
46 os.remove(qmonfname)
47 except:
48 # soft error
49 traceback.print_exc(file=sys.stderr)
51 def main():
52 # vars overriden from env
53 confdir = '/mnt/cluster/conf/lock'
54 if 'FLOCK_MULTI_CONF_DIR' in os.environ:
55 confdir = os.environ['FLOCK_MULTI_CONF_DIR']
57 lockdir = '/mnt/cluster/lock'
58 if 'FLOCK_MULTI_DIR' in os.environ:
59 lockdir = os.environ['FLOCK_MULTI_DIR']
61 qmondir= os.path.join(lockdir,'queuemonitor')
62 if 'FLOCK_MULTI_QMON_DIR' in os.environ:
63 qmondir = os.environ['FLOCK_MULTI_QMON_DIR']
65 try:
66 long_opts = ["help", "verbose", "queuemonitor", "qmon", "conflict-exit-code=",
67 "timeout-exit-code=", "sleeptime=", "wait=", "timeout="]
68 opts, args = getopt.getopt(sys.argv[1:], "hvQE:T:s:w:", long_opts)
69 except getopt.GetoptError, e:
70 sys.stderr.write("ERROR: Invalid parameter: %s\n" % e[0])
71 sys.stderr.write(help())
72 sys.exit(200)
74 if len(args) < 3:
75 sys.stderr.write("ERROR: At least 3 parameters needed.\n")
76 sys.stderr.write(help())
77 sys.exit(200)
79 verbose = False
80 qmon = False
81 flockerr = 200
82 timeouterr= 201
83 sleeptime = 60
84 timeout = 0
86 for o, a in opts:
87 if o in ("-v", "--verbose") :
88 verbose = True
89 elif o in ("-Q", "--queuemonitor", "--qmon") :
90 qmon = True
91 elif o in ("-h", "--help"):
92 usage()
93 sys.exit()
94 elif o in ("-E", "--conflict-exit-code"):
95 flockerr = int(a)
96 elif o in ("-T", "--timeout-exit-code"):
97 timeouterr = int(a)
98 elif o in ("-s", "--sleeptime"):
99 sleeptime = int(a)
100 elif o in ("-w", "--wait", "--timeout"):
101 timeout = float(arg_to_secs(a)) + time.time()
102 else:
103 assert False, "unhandled option %s" % o
104 # argument params
105 lockname = args.pop(0)
106 maxlocks = int(args.pop(0))
107 cmd = args
109 conffile = os.path.join(confdir, lockname)
110 try:
111 if os.path.exists(conffile):
112 tmpval = int(open(conffile).read())
113 maxlocks= tmpval
114 except:
115 sys.stderr.write("WARNING: Ignoring invalid value in %s\n" % conffile)
117 if verbose:
118 print "Using %s maxlocks" % maxlocks
120 mypid = os.getpid()
121 hostname = os.uname()[1]
123 gotlock = False
124 cmdexit = 0
125 # cast to have better splay
126 sleeptime = float(sleeptime)
128 global qmonfname
130 if qmon:
131 try:
132 qmonfname = os.path.join(qmondir, '%s:%s:%s' % (lockname, hostname, mypid))
133 open(qmonfname, 'w').close() # "touch"
134 if verbose:
135 print "qmonfile %s" % qmonfname
136 except:
137 # soft error
138 traceback.print_exc(file=sys.stderr)
140 while True:
141 locks = range(1, maxlocks+1)
142 if len(locks) > 1:
143 random.shuffle(locks)
144 for trylock in locks:
145 # print "trying %s" %trylock
146 trylockfn = os.path.join(lockdir, lockname + '.%s' % trylock)
147 # we open for "append", and only move to truncate the
148 # file if we succeed in getting the flock
149 with open(trylockfn, 'a') as fh:
150 try:
151 fcntl.flock(fh, fcntl.LOCK_EX|fcntl.LOCK_NB)
152 fh.seek(0)
153 fh.truncate(0)
154 fh.flush()
155 gotlock = True
156 if verbose:
157 print "Got %s" % trylockfn
158 t = time.gmtime()
159 fh.write("%s PID: %s epoch: %s %s\n" %
160 (hostname, mypid, time.time(), time.strftime("%a, %d %b %Y %H:%M:%S +0000", t)))
161 fh.write(' '.join(cmd))
162 fh.write("\n")
163 fh.flush()
164 maybe_remove_qmonfile()
165 # execute the command requested
166 cmdexit = subprocess.call(cmd)
167 except IOError: # failed to get lock, nonfatal
168 pass
169 except OSError as e:
170 if e.errno == 2:
171 sys.stderr.write("ERROR: No such file or directory: %s\n" % cmd[0])
172 sys.exit(flockerr)
173 else:
174 raise
175 finally:
176 # runs on _all_ exceptions - IOError, OSError and KeyboardInterrupt
177 # truncate flock'd file on completion
178 if gotlock:
179 fh.seek(0)
180 fh.truncate(0)
181 fh.flush()
183 if gotlock:
184 sys.exit(cmdexit)
186 # all locks taken
187 maybe_timeout(timeout, timeouterr)
189 splay = sleeptime / 10
190 actual_sleep = sleeptime + random.uniform(0 - splay, splay)
191 if verbose:
192 print "Tried all locks - sleeping %s" % actual_sleep
193 time.sleep(actual_sleep)
194 maybe_timeout(timeout, timeouterr)
196 if __name__ == '__main__':
197 qmonfname = None
198 try:
199 main()
200 except KeyboardInterrupt: # user hit control-C
201 sys.exit(130)
202 except Exception: # all "interesting" exceptions, but not SystemExit
203 traceback.print_exc(file=sys.stdout)
204 exit(200)
205 finally:
206 maybe_remove_qmonfile()