Use py_resource module
[python/dscho.git] / Demo / threads / find.py
blob940360abdff2dd73247714824018f60a9b69434b
1 # A parallelized "find(1)" using the thread module (SGI only for now).
3 # This demonstrates the use of a work queue and worker threads.
4 # It really does do more stats/sec when using multiple threads,
5 # although the improvement is only about 20-30 percent.
7 # I'm too lazy to write a command line parser for the full find(1)
8 # command line syntax, so the predicate it searches for is wired-in,
9 # see function selector() below. (It currently searches for files with
10 # group or world write permission.)
12 # Usage: parfind.py [-w nworkers] [directory] ...
13 # Default nworkers is 4, maximum appears to be 8 (on Irix 4.0.2)
16 import sys
17 import getopt
18 import string
19 import time
20 import os
21 from stat import *
22 import thread
25 # Work queue class. Usage:
26 # wq = WorkQ()
27 # wq.addwork(func, (arg1, arg2, ...)) # one or more calls
28 # wq.run(nworkers)
29 # The work is done when wq.run() completes.
30 # The function calls executed by the workers may add more work.
31 # Don't use keyboard interrupts!
33 class WorkQ:
35 # Invariants:
37 # - busy and work are only modified when mutex is locked
38 # - len(work) is the number of jobs ready to be taken
39 # - busy is the number of jobs being done
40 # - todo is locked iff there is no work and somebody is busy
42 def __init__(self):
43 self.mutex = thread.allocate()
44 self.todo = thread.allocate()
45 self.todo.acquire()
46 self.work = []
47 self.busy = 0
49 def addwork(self, func, args):
50 job = (func, args)
51 self.mutex.acquire()
52 self.work.append(job)
53 self.mutex.release()
54 if len(self.work) == 1:
55 self.todo.release()
57 def _getwork(self):
58 self.todo.acquire()
59 self.mutex.acquire()
60 if self.busy == 0 and len(self.work) == 0:
61 self.mutex.release()
62 self.todo.release()
63 return None
64 job = self.work[0]
65 del self.work[0]
66 self.busy = self.busy + 1
67 self.mutex.release()
68 if len(self.work) > 0:
69 self.todo.release()
70 return job
72 def _donework(self):
73 self.mutex.acquire()
74 self.busy = self.busy - 1
75 if self.busy == 0 and len(self.work) == 0:
76 self.todo.release()
77 self.mutex.release()
79 def _worker(self):
80 while 1:
81 job = self._getwork()
82 if not job:
83 break
84 func, args = job
85 apply(func, args)
86 self._donework()
88 def run(self, nworkers):
89 if not self.work:
90 return # Nothing to do
91 for i in range(nworkers-1):
92 thread.start_new(self._worker, ())
93 self._worker()
94 self.todo.acquire()
97 # Main program
99 def main():
100 nworkers = 4
101 opts, args = getopt.getopt(sys.argv[1:], '-w:')
102 for opt, arg in opts:
103 if opt == '-w':
104 nworkers = string.atoi(arg)
105 if not args:
106 args = [os.curdir]
108 wq = WorkQ()
109 for dir in args:
110 wq.addwork(find, (dir, selector, wq))
112 t1 = time.time()
113 wq.run(nworkers)
114 t2 = time.time()
116 sys.stderr.write('Total time ' + `t2-t1` + ' sec.\n')
119 # The predicate -- defines what files we look for.
120 # Feel free to change this to suit your purpose
122 def selector(dir, name, fullname, stat):
123 # Look for group or world writable files
124 return (stat[ST_MODE] & 0022) != 0
127 # The find procedure -- calls wq.addwork() for subdirectories
129 def find(dir, pred, wq):
130 try:
131 names = os.listdir(dir)
132 except os.error, msg:
133 print `dir`, ':', msg
134 return
135 for name in names:
136 if name not in (os.curdir, os.pardir):
137 fullname = os.path.join(dir, name)
138 try:
139 stat = os.lstat(fullname)
140 except os.error, msg:
141 print `fullname`, ':', msg
142 continue
143 if pred(dir, name, fullname, stat):
144 print fullname
145 if S_ISDIR(stat[ST_MODE]):
146 if not os.path.ismount(fullname):
147 wq.addwork(find, (fullname, pred, wq))
150 # Call the main program
152 main()