Update README for archival
[reddit.git] / scripts / hashdist.py
blobfc92bc9fb95c59561178885411ccb900d261ec5e
1 #!/usr/bin/env python2.7
3 from Queue import Queue
4 import argparse
5 import logging
6 import multiprocessing
7 import os
8 import re
9 import string
10 import subprocess
11 import sys
12 import threading
15 def parse_size(s):
16 def mult(multiplier):
17 return int(s[:-1])*multiplier
19 if all(x in string.digits for x in s):
20 return int(s)
21 if s.endswith('b'):
22 return mult(1)
23 if s.endswith('k'):
24 return mult(1024)
25 if s.endswith('m'):
26 return mult(1024*1024)
27 if s.endswith('g'):
28 return mult(1024*1024*1024)
29 raise Exception("Can't parse %r" % (s,))
32 class JobInputter(threading.Thread):
33 """
34 Takes input originally from stdin through iq and sends it to the job
35 """
36 def __init__(self, job_name, popen, iq):
37 self.job_name = job_name
38 self.popen = popen
39 self.iq = iq
40 super(JobInputter, self).__init__()
42 def __repr__(self):
43 return "<%s %s>" % (self.__class__.__name__, self.job_name)
45 def run(self):
46 while True:
47 item = self.iq.get()
48 logging.debug("%r got item %r", self, item)
49 if item is None:
50 logging.debug("%r closing %r", self, self.popen.stdin)
51 self.popen.stdin.close()
52 self.iq.task_done()
53 break
55 try:
56 self.popen.stdin.write(item)
57 self.popen.stdin.flush()
58 self.iq.task_done()
59 except IOError:
60 logging.exception("exception writing to popen %r", self.popen)
61 return os._exit(1)
64 class JobOutputter(threading.Thread):
65 """
66 Takes output from the job and sends it to stdout
67 """
68 def __init__(self, job_name, popen, out_fd, lock):
69 self.job_name = job_name
70 self.popen = popen
71 self.out_fd = out_fd
72 self.lock = lock
73 super(JobOutputter, self).__init__()
75 def __repr__(self):
76 return "<%s %s>" % (self.__class__.__name__, self.job_name)
78 def run(self):
79 for line in self.popen.stdout:
80 logging.debug("%r read %d bytes", self, len(line))
81 with self.lock:
82 try:
83 self.out_fd.write(line)
84 except IOError as e:
85 if e.errno != errno.EPIPE:
86 logging.exception("exception writing to output %r", self.out_fd)
87 return os._exit(1)
89 logging.debug("Got eof on %r", self)
92 def hash_select(key, choices):
93 return choices[hash(key) % len(choices)]
96 def main():
97 try:
98 return _main()
99 except KeyboardInterrupt:
100 # because we mess with threads a lot, we need to make sure that ^C is
101 # actually a nuclear kill
102 os._exit(1)
104 def _main():
105 parser = argparse.ArgumentParser()
106 parser.add_argument('-n', metavar='N', type=int,
107 default=multiprocessing.cpu_count(), dest='nprocs')
108 parser.add_argument('-b', '--buffer', metavar='N', type=parse_size,
109 help="size (in lines) of input buffer for each process",
110 default=1024,
111 dest='bufsize')
112 parser.add_argument('-f', metavar='FIELDSEP', type=str, default='\t',
113 dest='field_sep')
114 parser.add_argument('-r', metavar='FIELDRE', type=str, default=None,
115 dest='field_re')
116 parser.add_argument('--logging', help=argparse.SUPPRESS, default='error')
117 parser.add_argument('cmd', nargs='+')
119 args = parser.parse_args()
121 if args.field_re and args.field_sep:
122 args.print_usage()
123 return sys.exit(1)
125 if args.nprocs == 1:
126 # if you only want one, what do you need me for?
127 os.execvp(args.cmd[0], args.cmd)
128 return sys.exit(1) # will never get here
130 if args.field_re:
131 first_field_re = re.compile(args.field_re)
132 else:
133 first_field_re = re.compile('^([^'+re.escape(args.field_sep)+']+)')
135 logging.basicConfig(level=getattr(logging, args.logging.upper()))
137 stdout_mutex = threading.Lock()
138 processes = []
140 for x in range(args.nprocs):
141 logging.debug("Starting %r (%d)", args.cmd, x)
142 ps = subprocess.Popen(args.cmd,
143 stdin=subprocess.PIPE,
144 stdout=subprocess.PIPE)
145 psi = JobInputter(x, ps, Queue(maxsize=args.bufsize))
146 pso = JobOutputter(x, ps, sys.stdout, stdout_mutex)
147 psi.start()
148 pso.start()
149 processes.append((psi, pso))
151 for line in sys.stdin:
152 if not line:
153 continue
155 logging.debug("Read %d bytes from stdin", len(line))
157 first_field_m = first_field_re.match(line)
158 first_field = first_field_m.group(0)
159 psi, _pso = hash_select(first_field, processes)
160 logging.debug("Writing %d bytes to %r (%r)", len(line), psi, first_field)
161 psi.iq.put(line)
163 logging.debug("Hit eof on stdin")
165 for x, (psi, pso) in enumerate(processes):
166 logging.debug("Sending terminator to %d (%r)", x, psi)
167 psi.iq.put(None)
169 for x, (psi, pso) in enumerate(processes):
170 logging.debug("Waiting for q %d (%r)", x, psi)
171 psi.iq.join()
172 logging.debug("Waiting for psi %d (%r)", x, psi)
173 psi.join()
174 logging.debug("Waiting for pso %d (%r)", x, psi)
175 pso.join()
177 return sys.exit(0)
180 if __name__ == '__main__':
181 main()