* io.c (rb_open_file): encoding in mode string was ignored if perm is
[ruby-svn.git] / lib / shell / process-controller.rb
blob4f28e018adf20c3a3c7856adaed8f6df64e6d910
2 #   shell/process-controller.rb - 
3 #       $Release Version: 0.7 $
4 #       $Revision$
5 #       by Keiju ISHITSUKA(keiju@ruby-lang.org)
7 # --
9 #   
11 require "forwardable"
13 require "thread"
14 require "sync"
16 class Shell
17   class ProcessController
19     @ProcessControllers = {}
20     @ProcessControllersMonitor = Mutex.new
21     @ProcessControllersCV = ConditionVariable.new
23     @BlockOutputMonitor = Mutex.new
24     @BlockOutputCV = ConditionVariable.new
26     class<<self
27       extend Forwardable
29       def_delegator("@ProcessControllersMonitor", 
30                     "synchronize", "process_controllers_exclusive")
32       def active_process_controllers
33         process_controllers_exclusive do
34           @ProcessControllers.dup
35         end
36       end
38       def activate(pc)
39         process_controllers_exclusive do
40           @ProcessControllers[pc] ||= 0
41           @ProcessControllers[pc] += 1
42         end
43       end
45       def inactivate(pc)
46         process_controllers_exclusive do
47           if @ProcessControllers[pc]
48             if (@ProcessControllers[pc] -= 1) == 0
49               @ProcessControllers.delete(pc)
50               @ProcessControllersCV.signal
51             end
52           end
53         end
54       end
56       def each_active_object
57         process_controllers_exclusive do
58           for ref in @ProcessControllers.keys
59             yield ref
60           end
61         end
62       end
64       def block_output_synchronize(&b)
65         @BlockOutputMonitor.synchronize &b
66       end
68       def wait_to_finish_all_process_controllers
69         process_controllers_exclusive do
70           while !@ProcessControllers.empty?
71             Shell::notify("Process finishing, but active shell exists",
72                           "You can use Shell#transact or Shell#check_point for more safe execution.")
73             if Shell.debug?
74               for pc in @ProcessControllers.keys
75                 Shell::notify(" Not finished jobs in "+pc.shell.to_s)
76                 for com in pc.jobs
77                   com.notify("  Jobs: %id")
78                 end
79               end
80             end
81             @ProcessControllersCV.wait(@ProcessControllersMonitor)
82           end
83         end
84       end
85     end
87     # for shell-command complete finish at this process exit.
88     USING_AT_EXIT_WHEN_PROCESS_EXIT = true
89     at_exit do
90       wait_to_finish_all_process_controllers unless $@
91     end
93     def initialize(shell)
94       @shell = shell
95       @waiting_jobs = []
96       @active_jobs = []
97       @jobs_sync = Sync.new
99       @job_monitor = Mutex.new
100       @job_condition = ConditionVariable.new
101     end
103     attr_reader :shell
105     def jobs
106       jobs = []
107       @jobs_sync.synchronize(:SH) do
108         jobs.concat @waiting_jobs
109         jobs.concat @active_jobs
110       end
111       jobs
112     end
114     def active_jobs
115       @active_jobs
116     end
118     def waiting_jobs
119       @waiting_jobs
120     end
121     
122     def jobs_exist?
123       @jobs_sync.synchronize(:SH) do
124         @active_jobs.empty? or @waiting_jobs.empty?
125       end
126     end
128     def active_jobs_exist?
129       @jobs_sync.synchronize(:SH) do
130         @active_jobs.empty?
131       end
132     end
134     def waiting_jobs_exist?
135       @jobs_sync.synchronize(:SH) do
136         @waiting_jobs.empty?
137       end
138     end
140     # schedule a command
141     def add_schedule(command)
142       @jobs_sync.synchronize(:EX) do
143         ProcessController.activate(self)
144         if @active_jobs.empty?
145           start_job command
146         else
147           @waiting_jobs.push(command)
148         end
149       end
150     end
152     # start a job
153     def start_job(command = nil)
154       @jobs_sync.synchronize(:EX) do
155         if command
156           return if command.active?
157           @waiting_jobs.delete command
158         else
159           command = @waiting_jobs.shift
160 #         command.notify "job(%id) pre-start.", @shell.debug?
161           
162           return unless command
163         end
164         @active_jobs.push command
165         command.start
166 #       command.notify "job(%id) post-start.", @shell.debug?
168         # start all jobs that input from the job
169         for job in @waiting_jobs.dup
170           start_job(job) if job.input == command
171         end
172 #       command.notify "job(%id) post2-start.", @shell.debug?
173       end
174     end
176     def waiting_job?(job)
177       @jobs_sync.synchronize(:SH) do
178         @waiting_jobs.include?(job)
179       end
180     end
182     def active_job?(job)
183       @jobs_sync.synchronize(:SH) do
184         @active_jobs.include?(job)
185       end
186     end
188     # terminate a job
189     def terminate_job(command)
190       @jobs_sync.synchronize(:EX) do
191         @active_jobs.delete command
192         ProcessController.inactivate(self)
193         if @active_jobs.empty?
194           command.notify("start_jon in ierminate_jon(%id)", Shell::debug?)
195           start_job
196         end
197       end
198     end
200     # kill a job
201     def kill_job(sig, command)
202       @jobs_sync.synchronize(:EX) do
203         if @waiting_jobs.delete command
204           ProcessController.inactivate(self)
205           return
206         elsif @active_jobs.include?(command)
207           begin
208             r = command.kill(sig)
209             ProcessController.inactivate(self)
210           rescue
211             print "Shell: Warn: $!\n" if @shell.verbose?
212             return nil
213           end
214           @active_jobs.delete command
215           r
216         end
217       end
218     end
220     # wait for all jobs to terminate
221     def wait_all_jobs_execution
222       @job_monitor.synchronize do
223         begin
224           while !jobs.empty?
225             @job_condition.wait(@job_monitor)
226             for job in jobs
227               job.notify("waiting job(%id)", Shell::debug?)
228             end
229           end
230         ensure
231           redo unless jobs.empty?
232         end
233       end
234     end
236     # simple fork
237     def sfork(command, &block)
238       pipe_me_in, pipe_peer_out = IO.pipe
239       pipe_peer_in, pipe_me_out = IO.pipe
242       pid = nil
243       pid_mutex = Mutex.new
244       pid_cv = ConditionVariable.new
246       Thread.start do
247         ProcessController.block_output_synchronize do
248           STDOUT.flush
249           ProcessController.each_active_object do |pc|
250             for jobs in pc.active_jobs
251               jobs.flush
252             end
253           end
255           pid = fork {
256             Thread.list.each do |th| 
257 #             th.kill unless [Thread.main, Thread.current].include?(th)
258               th.kill unless Thread.current == th
259             end
261             STDIN.reopen(pipe_peer_in)
262             STDOUT.reopen(pipe_peer_out)
264             ObjectSpace.each_object(IO) do |io| 
265               if ![STDIN, STDOUT, STDERR].include?(io)
266                 io.close unless io.closed?
267               end
268             end
270             yield
271           }
272         end
273         pid_cv.signal
275         pipe_peer_in.close
276         pipe_peer_out.close
277         command.notify "job(%name:##{pid}) start", @shell.debug?
279         begin
280           _pid = nil
281           command.notify("job(%id) start to waiting finish.", @shell.debug?)
282           _pid = Process.waitpid(pid, nil)
283         rescue Errno::ECHILD
284           command.notify "warn: job(%id) was done already waitipd."
285           _pid = true
286           #     rescue
287           #       STDERR.puts $!
288         ensure
289           command.notify("Job(%id): Wait to finish when Process finished.", @shell.debug?)
290           # when the process ends, wait until the command termintes
291           if USING_AT_EXIT_WHEN_PROCESS_EXIT or _pid
292           else
293             command.notify("notice: Process finishing...",
294                            "wait for Job[%id] to finish.",
295                            "You can use Shell#transact or Shell#check_point for more safe execution.")
296             redo
297           end
298           
299 #         command.notify "job(%id) pre-pre-finish.", @shell.debug?
300           @job_monitor.synchronize do 
301 #           command.notify "job(%id) pre-finish.", @shell.debug?
302             terminate_job(command)
303 #           command.notify "job(%id) pre-finish2.", @shell.debug?
304             @job_condition.signal
305             command.notify "job(%id) finish.", @shell.debug?
306           end
307         end
308       end
310       pid_mutex.synchronize do
311         while !pid
312           pid_cv.wait(pid_mutex)
313         end
314       end
316       return pid, pipe_me_in, pipe_me_out
317     end
318   end