1 #!/usr/bin/env python2.7
3 from Queue
import Queue
17 return int(s
[:-1])*multiplier
19 if all(x
in string
.digits
for x
in s
):
26 return mult(1024*1024)
28 return mult(1024*1024*1024)
29 raise Exception("Can't parse %r" % (s
,))
32 class JobInputter(threading
.Thread
):
34 Takes input originally from stdin through iq and sends it to the job
36 def __init__(self
, job_name
, popen
, iq
):
37 self
.job_name
= job_name
40 super(JobInputter
, self
).__init
__()
43 return "<%s %s>" % (self
.__class
__.__name
__, self
.job_name
)
48 logging
.debug("%r got item %r", self
, item
)
50 logging
.debug("%r closing %r", self
, self
.popen
.stdin
)
51 self
.popen
.stdin
.close()
56 self
.popen
.stdin
.write(item
)
57 self
.popen
.stdin
.flush()
60 logging
.exception("exception writing to popen %r", self
.popen
)
64 class JobOutputter(threading
.Thread
):
66 Takes output from the job and sends it to stdout
68 def __init__(self
, job_name
, popen
, out_fd
, lock
):
69 self
.job_name
= job_name
73 super(JobOutputter
, self
).__init
__()
76 return "<%s %s>" % (self
.__class
__.__name
__, self
.job_name
)
79 for line
in self
.popen
.stdout
:
80 logging
.debug("%r read %d bytes", self
, len(line
))
83 self
.out_fd
.write(line
)
85 if e
.errno
!= errno
.EPIPE
:
86 logging
.exception("exception writing to output %r", self
.out_fd
)
89 logging
.debug("Got eof on %r", self
)
92 def hash_select(key
, choices
):
93 return choices
[hash(key
) % len(choices
)]
99 except KeyboardInterrupt:
100 # because we mess with threads a lot, we need to make sure that ^C is
101 # actually a nuclear kill
105 parser
= argparse
.ArgumentParser()
106 parser
.add_argument('-n', metavar
='N', type=int,
107 default
=multiprocessing
.cpu_count(), dest
='nprocs')
108 parser
.add_argument('-b', '--buffer', metavar
='N', type=parse_size
,
109 help="size (in lines) of input buffer for each process",
112 parser
.add_argument('-f', metavar
='FIELDSEP', type=str, default
='\t',
114 parser
.add_argument('-r', metavar
='FIELDRE', type=str, default
=None,
116 parser
.add_argument('--logging', help=argparse
.SUPPRESS
, default
='error')
117 parser
.add_argument('cmd', nargs
='+')
119 args
= parser
.parse_args()
121 if args
.field_re
and args
.field_sep
:
126 # if you only want one, what do you need me for?
127 os
.execvp(args
.cmd
[0], args
.cmd
)
128 return sys
.exit(1) # will never get here
131 first_field_re
= re
.compile(args
.field_re
)
133 first_field_re
= re
.compile('^([^'+re
.escape(args
.field_sep
)+']+)')
135 logging
.basicConfig(level
=getattr(logging
, args
.logging
.upper()))
137 stdout_mutex
= threading
.Lock()
140 for x
in range(args
.nprocs
):
141 logging
.debug("Starting %r (%d)", args
.cmd
, x
)
142 ps
= subprocess
.Popen(args
.cmd
,
143 stdin
=subprocess
.PIPE
,
144 stdout
=subprocess
.PIPE
)
145 psi
= JobInputter(x
, ps
, Queue(maxsize
=args
.bufsize
))
146 pso
= JobOutputter(x
, ps
, sys
.stdout
, stdout_mutex
)
149 processes
.append((psi
, pso
))
151 for line
in sys
.stdin
:
155 logging
.debug("Read %d bytes from stdin", len(line
))
157 first_field_m
= first_field_re
.match(line
)
158 first_field
= first_field_m
.group(0)
159 psi
, _pso
= hash_select(first_field
, processes
)
160 logging
.debug("Writing %d bytes to %r (%r)", len(line
), psi
, first_field
)
163 logging
.debug("Hit eof on stdin")
165 for x
, (psi
, pso
) in enumerate(processes
):
166 logging
.debug("Sending terminator to %d (%r)", x
, psi
)
169 for x
, (psi
, pso
) in enumerate(processes
):
170 logging
.debug("Waiting for q %d (%r)", x
, psi
)
172 logging
.debug("Waiting for psi %d (%r)", x
, psi
)
174 logging
.debug("Waiting for pso %d (%r)", x
, psi
)
180 if __name__
== '__main__':