change rules for cluster accessible dirs.
[cxgn-corelibs.git] / lib / CXGN / Tools / Run.pm
blob5d8a7511fbb978332b1a1f8506c3bd7494e0226e
1 package CXGN::Tools::Run;
2 use strict;
3 use warnings;
4 use Carp qw/ carp confess croak /;
5 use POSIX qw( :sys_wait_h strftime );
6 use Time::HiRes qw/time/;
8 use IPC::Cmd ();
10 use Data::Dumper;
12 use File::Path;
13 use File::Temp qw( tempfile );
14 use File::Basename;
15 use File::Spec;
16 use Cwd;
17 use UNIVERSAL qw/isa/;
18 use File::NFSLock qw/uncache/;
20 use Storable qw/ nstore retrieve /;
22 use constant DEBUG => $ENV{CXGNTOOLSRUNDEBUG} ? 1 : 0;
24 # debug print function
25 sub dbp(@) {
26 # get rid of first arg if it's one of these objects
27 return 1 unless DEBUG;
28 shift if( ref($_[0]) && ref($_[0]) =~ /::/ and $_[0]->isa(__PACKAGE__));
29 print STDERR '# dbg '.__PACKAGE__.': ',@_;
30 print STDERR "\n" unless $_[-1] =~ /\n$/;
31 return 1;
33 sub dprinta(@) {
34 if(DEBUG) {
35 local $Data::Dumper::Indent = 0;
36 print STDERR join(' ',map {ref($_) ? Dumper($_) : $_} @_)."\n";
42 =head1 NAME
44 CXGN::Tools::Run - run an external command, either synchronously
45 or asynchronously (in the background).
47 =head1 SYNOPSIS
49 ############ SYNCHRONOUS MODE #############
51 #just run the program, collecting its stderr and stdout outputs
53 my $run = CXGN::Tools::Run->run( 'fooprogram',
54 -i => 'myfile.seq',
55 -d => '/my/blast/databases/nr',
56 -e => '1e-10',
59 print "fooprogram printed '", $run->out, "' on stdout";
60 print "and it printed '", $run->err, "' on stderr";
62 ############ ASYNCHRONOUS MODE ############
64 #run the program in the background while your script does other
65 #things, or even exits
67 my $sleeper = CXGN::Tools::Run->run_async('sleep',600);
68 $sleeper->is_async
69 or die "But I ran this as asynchronous!";
70 $sleeper->alive
71 or die "Hey, it's not running!\n";
73 $sleeper->wait; #waits for the process to finish
75 $sleeper->die; #kills the process
76 $sleeper->cleanup; #don't forget to clean it up, deletes tempfiles
79 ############ RUNNING ON THE CLUSTER #########
81 #run the job, with a temp_base directory of /data/shared/tmp
82 my $cjob = CXGN::Tools::Run->run_cluster('sleep',600, {temp_base => '/data/shared/tmp'});
84 print "the Torque job id for that thing is ",$cjob->job_id,"\n";
86 #alive, wait, die, and all the rest work the same as for async
87 $cjob->cleanup; #don't forget to clean it up, deletes tempfiles
90 =head1 DESCRIPTION
92 This class is a handy way to run an external program, either in the
93 foreground, in the background, or as a cluster job, connecting files
94 or filehandles to its STDIN, STDOUT, and STDERR. Furthermore, the
95 objects of this class can be saved and restored with Storable, letting
96 you migrate backgrounded jobs between different instances of the
97 controlling program, such as when you're doing a web or SOAP interface
98 to a long-running computation.
100 One important different between using this module to run things and
101 using system() is that interrupt or kill signals are propagated to
102 child processes, so if you hit ctrl-C on your perl program, any
103 programs *IT* is running will also receive the SIGTERM.
105 If you want to see debugging output from this module, set an
106 environment variable CXGNTOOLSRUNDEBUG to something true, like "1", or
107 "you know a biologist is lying when they tell you something is
108 _always_ true".
110 =head1 CONSTRUCTORS
112 =cut
114 #make some private accessors
116 use Class::MethodMaker
117 [ scalar => [
118 'in_file', #holds filename or filehandle used to provide stdin
119 'out_file', #holds filename or filehandle used to capture stdout
120 'err_file', #holds filename or filehandle used to capture stderr
121 '_temp_base', #holds the object-specific temp_base, if set
122 '_max_cluster_jobs',#holds the object-specific max_cluster_jobs, if set
123 '_existing_temp', #holds whether we're using someone else's tempdir
124 '_told_to_die', #holds whether this job has been told to die
125 '_working_dir', #holds name of the process's working directory
126 '_die_on_destroy', #set to true if we should kill our subprocess
127 #when this object is destroyed
128 '_pid', #holds the pid of our background process, if any
129 '_jobid', #holds the jobid of our cluster process, if any
130 '_jobdest', #holds the server/queue destination
131 #where we submitted a cluster job
132 '_error_string', #holds our die error, if any
133 '_command', #holds the command string that was executed
134 '_job_name', #small name to use in tempdir names and job submission
135 '_host', #hostname where the command ran
136 '_start_time', #holds the time() from when we started the job
137 '_end_time', #holds the approximate time from when the job finished
138 '_exit_status', #holds the exit status ($?) from our job
139 '_on_completion', #subref to be run when job completes
140 '_already_ran_completion_hooks', #flag, set when completion hooks have been run
141 '_vmem', #bytes of memory the process is
142 #estimated to require
143 {-default => 1},
144 '_raise_error', #holds whether we throw errors or just store
145 #them in _error. defaults to undef
146 '_procs_per_node', #holds the number of processors to use for cluster
147 #and other parallel calls
148 '_nodes', #holds a torque-compliant nodelist, default of '1'
149 #not used for regular and _async runs
154 =head2 run
156 Usage: my $slept = CXGN::Tools::Run->run('sleep',3);
157 #would sleep 3 seconds, then return the object
158 Desc : run the program and wait for it to finish
159 Ret : a new CXGN::Tools::Run object
160 Args : executable to run (absolute or relative path, will also search $ENV{PATH}),
161 argument,
162 argument,
163 ...,
164 { in_file => filename or filehandle to put on job's STDIN,
165 out_file => filename or filehandle to capture job's STDOUT,
166 err_file => filename or filehandle to capture job's STDERR,
167 working_dir => path of working directory to run the program,
168 temp_base => path under which to put this job's temp dir
169 defaults to the whatever the class accessor temp_base()
170 is set to,
171 existing_temp => use this existing temp dir for storing your out, err,
172 and die files. will not automatically delete it
173 at the end of the script
174 raise_error => true if it should die on error, false otherwise.
175 default true
176 on_completion => subroutine ref to run when the job is
177 finished. runs synchronously in the
178 parent process that spawned the job,
179 usually called from inside $job->alive() or wait().
180 passed one arg: the $job_object
182 Side Effects: runs the program, waiting for it to finish
184 =cut
186 sub run {
187 my ($class,@args) = @_;
188 my $self = bless {},$class;
190 $ENV{MOD_PERL} and croak "CXGN::Tools::Run->run() not functional under mod_perl";
192 my $options = $self->_pop_options( \@args );
193 $self->_process_common_options( $options );
195 #now start the process and die informatively if it errors
196 $self->_start_time(time);
198 my $curdir = cwd();
199 eval {
200 chdir $self->working_dir or die "Could not change directory into '".$self->working_dir."': $!";
201 my $cmd = @args > 1 ? \@args : $args[0];
202 CXGN::Tools::Run::Run3::run3( $cmd, $self->in_file, $self->out_file, $self->err_file, $self->tempdir );
203 chdir $curdir or die "Could not cd back to parent working directory '$curdir': $!";
204 }; if( $@ ) {
205 $self->_error_string( $@ );
206 if($self->_raise_error) {
207 #write die messages to a file for later retrieval by interested
208 #parties, chiefly the parent process if this is a cluster job
209 $self->_write_die( $@ );
210 croak $self->_format_error_message( $@ );
213 $self->_end_time(time);
214 $self->_exit_status($?); #save the exit status of what we ran
216 $self->_run_completion_hooks; #< run any on_completion hooks we have
218 return $self;
221 =head2 run_async
223 Usage: my $sleeper = CXGN::Tools::Run->run_async('sleep',3);
224 Desc : run an external command in the background
225 Ret : a new L<CXGN::Tools::Run> object, which is a handle
226 for your running process
227 Args : executable to run (absolute or relative path, will also search $ENV{PATH}),
228 argument,
229 argument,
230 ...,
231 { die_on_destroy => 1, #default is 0, does not matter for a synchronous run.
232 in_file => filename or filehandle,
233 out_file => filename or filehandle,
234 err_file => filename or filehandle,
235 working_dir => path of working dir to run this program
236 temp_base => path under which to but this job's temp dir,
237 defaults to the whatever the class accessor temp_base()
238 is set to,
239 existing_temp => use this existing temp dir for storing your out, err,
240 and die files. will not automatically delete it
241 at the end of the script
242 raise_error => true if it should die on error, false otherwise.
243 default true
244 on_completion => subroutine ref to run when the job is
245 finished. runs synchronously in the
246 parent process that spawned the job,
247 usually called from inside $job->alive() or wait()
248 passed one arg: the $job_object
250 Side Effects: runs the given command in the background, dies if the program
251 terminated abnormally
253 If you set die_on_destroy in the options hash, the backgrounded program
254 will be killed whenever this object goes out of scope.
256 =cut
258 sub run_async {
259 my ($class,@args) = @_;
260 my $self = bless {},$class;
262 $ENV{MOD_PERL} and croak "CXGN::Tools::Run->run_async() not functional under mod_perl";
264 my $options = $self->_pop_options( \@args );
265 $self->_process_common_options( $options );
266 $self->is_async(1);
268 #make sure we have a temp directory made already before we fork
269 #calling tempdir() makes this directory and returns its name.
270 #dbp is debug print, which only prints if $ENV{CXGNTOOLSRUNDEBUG} is set
271 $self->dbp('starting background process with tempdir ',$self->tempdir);
273 #make a subroutine that wraps the run3() call in order to save any
274 #error messages into a file named 'died' in the process's temp dir.
275 my $pid = fork;
276 # $SIG{CHLD} = \&REAPER;
277 # $SIG{CHLD} = 'IGNORE';
278 unless($pid) {
279 #CODE FOR THE BACKGROUND PROCESS THAT RUNS THIS JOB
280 my $curdir = cwd();
281 eval {
282 # #handle setting reader/writer on IO::Pipe objects if any were passed in
283 $self->in_file->reader if isa($self->in_file,'IO::Pipe');
284 $self->out_file->writer if isa($self->out_file,'IO::Pipe');
285 $self->err_file->writer if isa($self->out_file,'IO::Pipe');
287 chdir $self->working_dir
288 or die "Could not cd to new working directory '".$self->working_dir."': $!";
289 # setpgrp; #run this perl and its exec'd child as their own process group
290 my $cmd = @args > 1 ? \@args : $args[0];
291 CXGN::Tools::Run::Run3::run3($cmd, $self->in_file, $self->out_file, $self->err_file, $self->tempdir );
292 chdir $curdir or die "Could not cd back to parent working dir '$curdir': $!";
294 }; if( $@ ) {
295 #write die messages to a file for later retrieval by parent process
296 $self->_write_die( $@ );
298 #explicitly close all our filehandles, cause the hard exit doesn't do it
299 foreach ($self->in_file,$self->out_file,$self->err_file) {
300 if(isa($_,'IO::Handle')) {
301 # warn "closing $_\n";
302 close $_;
305 POSIX::_exit(0); #call a HARD exit to avoid running any weird END blocks
306 #or DESTROYs from our parent thread
308 #CODE FOR THE PARENT
309 $self->_pid($pid);
311 $self->_die_if_error; #check if it's died
312 return $self;
315 =head2 run_cluster
317 Usage: my $sleeper = CXGN::Tools::Run->run_cluster('sleep',30);
318 Desc : run a command on a cluster using the 'qsub' command
319 Ret : a new L<CXGN::Tools::Run> object, which is a handle
320 for your running cluster job
321 Args : executable to run (absolute or relative path, will also search $ENV{PATH}),
322 argument,
323 argument,
324 ...,
325 { die_on_destroy => 1, #default is 0
326 in_file => do not use, not yet supported for cluster jobs
327 out_file => filename, defaults to a new one created internally,
328 err_file => filename, defaults to a new one created internally,
329 working_dir => path of working dir to run this program
330 temp_base => path under which to put this job's temp dir
331 defaults to the whatever the class accessor temp_base()
332 is set to,
333 on_completion => subroutine ref to run when the job is
334 finished. runs synchronously in the
335 parent process that spawned the job,
336 usually called from inside $job->alive()
337 passed one arg: the $job_object
338 existing_temp => use this existing temp dir for storing your out, err,
339 and die files. will not automatically delete it
340 at the end of the script
341 raise_error => true if it should die on error, false otherwise.
342 default true,
343 nodes => torque-compatible node list to use for running this job. default is '1',
344 procs_per_node => number of processes this job will use on each node. default 1,
345 vmem => estimate of total virtual memory (RAM) used by the process, in megabytes,
346 queue => torque-compatible job queue specification string, e.g. 'batch@solanine',
347 if running in a web environment, defaults to the value of the
348 'web_cluster_queue' conf key, otherwise, defaults to blank, which will
349 use the default queue that the 'qsub' command is configured to use.
351 Side Effects: runs the given command in the background, dies if the program
352 terminated abnormally
354 If you set die_on_destroy in the options hash, the job will be killed with `qdel`
355 if this object goes out of scope.
357 =cut
359 sub run_cluster {
360 my ($class,@args) = @_;
362 my $self = bless {},$class;
364 my $options = $self->_pop_options( \@args );
365 $self->_process_common_options( $options );
367 return $self->_run_cluster( \@args, $options );
370 sub _run_cluster {
371 my ( $self, $cmd, $options ) = @_;
373 $self->is_cluster(1);
375 $self->_command( $cmd ); #< store the command for use in error messages
377 # set our job destination from configuration if running under the website
378 if( defined $options->{queue} ) {
379 $self->_jobdest($options->{queue});
381 # TODO: change all SGN code to pass in queue =>, then remove this!
382 elsif( defined $ENV{PROJECT_NAME} && $ENV{PROJECT_NAME} eq 'SGN' ) {
383 require SGN::Context;
384 if( my $q = SGN::Context->new->get_conf('web_cluster_queue') ) {
385 $self->_jobdest( $q );
389 #check that qsub is actually in the path
390 IPC::Cmd::can_run('qsub')
391 or croak "qsub command not in path, cannot submit jobs to the cluster. "
392 ."Maybe you need to install the torque package?";
394 #check that our out_file, err_file, and in_file are accessible from the cluster nodes
395 sub cluster_accessible {
396 my $path = shift;
397 # warn "relpath $path\n";
398 $path = File::Spec->rel2abs("$path");
399 # warn "abspath $path\n";
400 return 1 if $path =~ m!(/net/[^/]+)?(/(data|export)/(shared|prod|trunk)|/(home|crypt))!;
401 return 0;
404 my $tempdir = $self->tempdir;
405 $self->in_file
406 and croak "in_file not supported by run_cluster";
407 foreach my $acc ('out_file','err_file') {
408 my $file = $self->$acc;
409 $file = $self->$acc("$file"); #< stringify the argument
411 croak "tempdir ".$self->tempdir." is not on /data/shared or /data/prod, but needs to be for cluster jobs. Do you need to set a different temp_base?\n"
412 unless cluster_accessible($tempdir);
414 croak "filehandle or non-stringifying out_file, err_file, or in_file not supported by run_cluster"
415 if $file =~ /^([\w:]+=)?[A-Z]+\(0x[\da-f]+\)$/;
416 #print "file was $file\n";
418 unless(cluster_accessible($file)) {
419 if(index($file,$tempdir) != -1) {
420 croak "tempdir ".$self->tempdir." is not on /data/shared or /data/prod, but needs to be for cluster jobs. Do you need to set a different temp_base?\n";
421 } else {
422 croak "'$file' must be in a subdirectory of /data/shared or /data/prod in order to be accessible to all cluster nodes";
427 #check that our working directory, if set, is accessible from the cluster nodes
428 if($self->_working_dir_isset) {
429 cluster_accessible($self->_working_dir)
430 or croak "working directory '".$self->_working_dir."' is not a subdirectory of /data/shared or /data/prod, but should be in order to be accessible to the cluster nodes";
434 # if the cluster head node is currently is running more than
435 # max_cluster_jobs jobs, don't overload it, block until the number
436 # of jobs goes down. prints a warning the first time in the run
437 # that this happens
438 $self->_wait_for_overloaded_cluster;
440 ###submit the job with qsub in the form of a bash script that contains a perl script
441 #we do this so we can use CXGN::Tools::Run to write
442 my $working_dir = $self->_working_dir_isset ? "working_dir => '".$self->_working_dir."'," : '';
443 my $cmd_string = do {
444 local $Data::Dumper::Terse = 1;
445 local $Data::Dumper::Indent = 0;
446 join ', ', map Dumper( "$_" ), @$cmd;
448 my $outfile = $self->out_file;
449 my $errfile = $self->err_file;
451 $cmd_string = <<'EOSCRIPT'
452 #!/usr/bin/env perl
454 # take PBS_O_* environment variables as our own, overriding local
455 # node settings
456 %ENV = ( %ENV,
457 map {
458 my $orig = $_;
459 if(s/PBS_O_//) {
460 $_ => $ENV{$orig}
461 } else {
465 keys %ENV
468 EOSCRIPT
469 .<<EOSCRIPT;
470 CXGN::Tools::Run->run($cmd_string,
471 { out_file => '$outfile',
472 err_file => '$errfile',
473 existing_temp => '$tempdir',
474 $working_dir
477 EOSCRIPT
479 dbp "running cmd_string:\n$cmd_string\n";
481 # also, include a copy of this very module!
482 $cmd_string .= $self->_file_contents( __FILE__ );
483 # disguise the ending EOF so that it passes through the file inclusion
485 #$self->dbp("cluster running command '$cmd_string'");
486 my $cmd_temp_file = File::Temp->new( TEMPLATE =>
487 File::Spec->catfile( File::Spec->tmpdir, 'cxgn-tools-run-cmd-temp-XXXXXX' )
489 $cmd_temp_file->print( $cmd_string );
490 $cmd_temp_file->close;
492 my $retry_count;
493 my $qsub_retry_limit = 3; #< try 3 times to submit the job
494 my $submit_success;
495 until( ($submit_success = $self->_submit_cluster_job( $cmd_temp_file )) || ++$retry_count > $qsub_retry_limit ) {
496 sleep 1;
497 warn "CXGN::Tools::Run retrying cluster job submission.\n";
499 $submit_success or die "CXGN::Tools::Run: failed to submit cluster job, after $retry_count tries\n";
501 $self->_die_if_error;
503 return $self;
506 sub _submit_cluster_job {
507 my ($self, $cmd_temp_file) = @_;
509 my %resources = (
510 ppn => $self->_procs_per_node || undef,
511 nodes => $self->_nodes || 1,
512 vmem => $self->_vmem || undef,
514 my $resource_str = $self->_make_torque_resource_string(\%resources);
516 #note that you can use a reference to a string as a filehandle, which is done here:
517 my $qsub_cmd = join( ' ',
518 #my $qsub = CXGN::Tools::Run->run(
519 dprinta( "qsub",
520 '-V',
521 -r => 'n', #< not rerunnable, cause we'll notice it died
522 -o => '/dev/null',
523 -e => $self->err_file,
524 -N => $self->_job_name,
525 ( $self->_working_dir_isset ? (-d => $self->working_dir)
526 : ()
528 ( $self->_jobdest_isset ? (-q => $self->_jobdest)
529 : ()
531 -l => $resource_str,
532 $cmd_temp_file,
533 #{ in_file => \$cmd_string,
534 # out_file => \$jobid,
538 #die "running '$qsub_cmd'";
539 my $jobid = `$qsub_cmd 2>&1`; #< string to hold the job ID of this job submission
541 # test hook for testing a qsub failure, makes the test fail the first time
542 if( $ENV{CXGN_TOOLS_RUN_FORCE_QSUB_FAILURE} ) {
543 $jobid = $ENV{CXGN_TOOLS_RUN_FORCE_QSUB_FAILURE};
544 delete $ENV{CXGN_TOOLS_RUN_FORCE_QSUB_FAILURE};
547 $self->_flush_qstat_cache; #< force a qstat update
549 #check that we got a sane job id
550 chomp $jobid;
551 unless( $jobid =~ /^\d+(\.[a-zA-Z0-9-]+)+$/ ) {
552 warn "CXGN::Tools::Run error running `qsub`: $jobid\n";
553 return;
556 dbp( "got jobid $jobid" );
559 $self->_jobid($jobid); #< remember our job id
561 return 1;
564 sub _make_torque_resource_string {
565 my ($self, $r) = @_;
566 my %r = %$r;
568 # tweak nodes=
569 my $ppn = delete $r{ppn};
570 $r{nodes} .= ":ppn=$ppn" if $ppn;
572 # tweak vmem=
573 $r{vmem} .= 'm' if defined $r{vmem};
575 my $s = join ',', #< joined by commas
576 map {"$_=$r{$_}"} #< print it as type=value
577 grep defined $r{$_}, #< don't print any that are undef
578 sort #< in stable sorted order
579 keys %r; #< for each resource type
581 return $s;
586 =head2 run_cluster_perl
588 Usage: my $job = CXGN::Tools::Run->run_cluster_perl({ args => see below })
590 Desc : Like run_cluster, but calls a perl class method on a cluster
591 node with the given args. The method args can be anything
592 that Storable can serialize. The actual job launched on the
593 node is something like:
594 perl -M$class -e '$class->$method_name(@args)'
596 where the @args are exactly what you pass in method_args.
598 Args : { method => [ Class::Name => 'method_to_run' ],
599 args => arrayref of the method's arguments (can
600 be objects, datastructures, whatever),
602 (optional)
603 run_opts => hashref of CXGN::Tools::Run options (see
604 run_cluster() above),
605 load_packages => arrayref of perl packages to
606 require before deserializing the arguments,
607 perl => string or arrayref specifying how to invoke
608 perl on the remote node, defaults to
609 [ '/usr/bin/env', 'perl' ]
611 Ret : a job object, same as run_cluster
613 =cut
615 sub run_cluster_perl {
616 my ( $class, $args ) = @_;
617 my ( $perl, $method_args, $run_args, $packages ) =
618 @{$args}{qw{ perl args run_opts load_packages}};
620 my ($method_class, $method_name) = @{ $args->{method} };
622 $method_args ||= [];
623 $run_args ||= {};
624 $perl ||= [qw[ /usr/bin/env perl ]];
625 my @perl = ref $perl ? @$perl : ($perl);
627 my $self = bless {},$class;
628 $self->_job_name( $method_name );
629 $self->_process_common_options( $run_args );
631 $packages ||= [];
632 $packages = [$packages] unless ref $packages;
633 $packages = join '', map "require $_; ", @$packages;
635 if ( @$method_args ) {
636 my $args_file = File::Spec->catfile( $self->tempdir, 'args.dat' );
637 nstore( $method_args => $args_file ) or croak "run_cluster_perl: $! serializing args to '$args_file'";
638 return $self->_run_cluster(
639 [ @perl,
640 '-MStorable=retrieve',
641 '-M'.$class,
642 -e => $packages.$method_class.'->'.$method_name.'(@{retrieve("'.$args_file.'")})',
644 $run_args,
646 } else {
647 return $self->_run_cluster(
648 [ @perl,
649 '-MStorable=retrieve',
650 '-M'.$class,
651 -e => $packages.$method_class.'->'.$method_name.'()',
653 $run_args,
658 sub _run_cluster_perl_test { print 'a string for use by the test suite ('.join(',',@_).')' }
661 # if the cluster head node is currently is running more than
662 # max_cluster_jobs jobs, don't overload it, block until the number
663 # of jobs goes down. prints a warning the first time in the run
664 # that this happens
665 { my $already_warned;
666 sub _wait_for_overloaded_cluster {
667 my $self = shift;
668 if ( $self->_cluster_queue_jobs_count >= $self->_max_cluster_jobs ) {
670 # warn the first time the cluster-full condition is encountered
671 unless( $already_warned++ ) {
672 carp __PACKAGE__.": WARNING: cluster queue contains more than "
673 .$self->_max_cluster_jobs
674 ." (max_cluster_jobs) jobs, throttling job submissions\n";
677 sleep int rand 120 while $self->_cluster_queue_jobs_count >= $self->_max_cluster_jobs;
682 sub _pop_options {
683 my ( $self, $args ) = @_;
685 #make sure all of our args are defined
686 defined || croak "undefined argument passed to run method" foreach @$args;
688 my $options = ref($args->[-1]) eq 'HASH' ? pop( @$args ) : {};
690 #store our command array for later use in error messages and such
691 $self->_command($args);
693 unless( $self->_job_name ) {
694 my ($executable) = $self->_command->[0] =~ /^([^'\s]+)/;
695 $executable ||= '';
696 if($executable) {
697 $executable = basename($executable);
699 $self->_job_name($executable)
702 return $options;
705 #process the options hash and set the correct parameters in our object
706 #use for input and output
707 sub _process_common_options {
708 my ( $self, $options ) = @_;
710 my @allowed_options = qw(
711 in_file
712 out_file
713 err_file
714 working_dir
715 temp_base
716 max_cluster_jobs
717 existing_temp
718 raise_error
719 die_on_destroy
720 procs_per_node
721 on_completion
722 nodes
723 vmem
724 queue
726 foreach my $optname (keys %$options) {
727 grep {$optname eq $_} @allowed_options
728 or croak "'$optname' is not a valid option for run_*() methods\n";
731 #given a filehandle or filename, absolutify it if it is a filename
732 sub abs_if_filename($) {
733 my $name = shift
734 or return;
735 ref($name) ? $name : File::Spec->rel2abs($name);
738 #set our temp_base, if given
739 $self->_temp_base( $options->{temp_base} ) if defined $options->{temp_base};
741 #if an existing temp dir has been passed, verify that it exists, and
742 #use it
743 if(defined $options->{existing_temp}) {
744 $self->{tempdir} = $options->{existing_temp};
745 -d $self->{tempdir} or croak "existing_temp '$options->{existing_temp}' does not exist";
746 -w $self->{tempdir} or croak "existing_temp '$options->{existing_temp}' is not writable";
747 $self->_existing_temp(1);
750 #figure out where to put the files for the stdin and stderr
751 #outputs of the program. Make sure to use absolute file names
752 #in case the working dir gets changed
753 $self->out_file( abs_if_filename( $options->{out_file}
754 || File::Spec->catfile($self->tempdir, 'out')
757 $self->err_file( abs_if_filename( $options->{err_file}
758 || File::Spec->catfile($self->tempdir, 'err')
761 $self->in_file( abs_if_filename $options->{in_file} );
763 $self->working_dir( $options->{working_dir} );
765 dbp "Got dirs and files ",map {$_||=''; "'$_' "} $self->out_file, $self->err_file, $self->in_file, $self->working_dir;
767 $self->_die_on_destroy(1) if $options->{die_on_destroy};
768 $self->_raise_error(0) if defined($options->{raise_error}) && !$options->{raise_error};
770 $self->_procs_per_node($options->{procs_per_node}) if defined $options->{procs_per_node};
771 $self->_nodes($options->{nodes}) if defined $options->{nodes};
772 $self->_vmem($options->{vmem}) if defined $options->{vmem};
774 $self->_max_cluster_jobs( $options->{max_cluster_jobs} || 2000 );
776 if( defined $options->{on_completion} ) {
777 my $c = $options->{on_completion};
778 $c = [$c] unless ref $c eq 'ARRAY';
779 foreach (@$c) {
780 ref eq 'CODE'
781 or croak 'on_completion hooks must each be a CODE reference';
783 $self->_on_completion( $c );
786 # set is_cluster and is_async defaults
787 $self->is_cluster(0);
788 $self->is_async(0);
790 return $options;
793 # NOTE: temp_base class method is deprecated, do not use in new code.
794 # pass temp_base to each Run invocation instead
795 # =head2 temp_base
797 # Usage: CXGN::Tools::Run->temp_base('/data/local/temp');
799 # Desc : class method to get/set the base directory where these
800 # objects put their tempfiles. This defaults to the value of
801 # File::Spec->tmpdir (which is usually '/tmp')
803 # Ret : directory name of place to put temp files
804 # Args : (optional) name of new place to put temp files
806 # =cut
808 # return the base path where CXGN::Tools::Run classes
809 # should stick their temp dirs, indexes, whatever
810 { my %tb = ( __PACKAGE__ , File::Spec->tmpdir );
811 sub temp_base {
812 my ( $class, $val ) = @_;
813 $tb{$class} = $val if @_ > 1;
814 return $tb{$class};
818 =head1 OBJECT METHODS
820 =head2 tempdir
822 Usage: my $dir = $job->tempdir;
823 Desc : get this object's temporary directory
824 Ret : the name of a unique temp directory used for
825 storing the output of this job
826 Args : none
827 Side Effects: creates a temp directory if one has not yet
828 been created
830 =cut
832 #object accessor that returns a path to an exclusive
833 #temp dir for that object. Does not actually
834 #create a temp directory until called.
835 my @CHARS = (qw/ A B C D E F G H I J K L M N O P Q R S T U V W X Y Z
836 a b c d e f g h i j k l m n o p q r s t u v w x y z
837 0 1 2 3 4 5 6 7 8 9 _
838 /); #< list of characters that can be used in tempfile names
840 sub tempdir {
841 my ($self) = @_;
843 #return our current temp dir if we have one
844 return $self->{tempdir} if $self->{tempdir};
846 #otherwise make a new temp dir
848 #TODO: do tempdir stem-and-leafing
849 #number of dirs in one dir = $#CHARS ^ $numchars
850 #number of possible combinations = $#CHARS ^ ($numchars+$numlevels)
851 my $numlevels = 5;
852 my $numchars = 2;
853 my $username = getpwuid $>;
854 my $temp_stem = File::Spec->catdir( ( $self->_temp_base() || __PACKAGE__->temp_base() ),
855 $username.'-cxgn-tools-run-tempfiles',
856 ( map {$CHARS[int rand $#CHARS].$CHARS[int rand $#CHARS]} 1..$numlevels ),
858 mkpath($temp_stem);
859 -d $temp_stem or die "could not make temp stem '$temp_stem'\n";
861 my $job_name = $self->_job_name || 'cxgn-tools-run';
863 my $newtemp = File::Temp::tempdir($job_name.'-XXXXXX',
864 DIR => $temp_stem,
865 CLEANUP => 0, #don't delete our kids' tempfiles
867 -d $newtemp or die __PACKAGE__.": Could not make temp dir $newtemp : $!";
868 -w $newtemp or die __PACKAGE__.": Temp dir $newtemp is not writable: $!";
870 $self->{tempdir} = $newtemp;
871 dbp "Made new temp dir $newtemp\n";
873 return $self->{tempdir};
877 #returns the name of the file to use for recording the die message from background jobs
878 sub _diefile_name {
879 my $self = shift;
880 return File::Spec->catfile( $self->tempdir, 'died');
883 #write a properly formatted error message to our diefile
884 sub _write_die {
885 my ($self,$error) = @_;
886 open my $diefile, ">".$self->_diefile_name
887 or die "Could not open file ".$self->_diefile_name.": $error: $!";
888 print $diefile $self->_format_error_message($error);
889 return 1;
892 # croak()s if our subprocess terminated abnormally
893 sub _die_if_error {
894 my $self = shift;
895 if( ($self->is_async || $self->is_cluster)
896 && $self->_diefile_exists) {
897 my $error_string = $self->_file_contents( $self->_diefile_name );
898 if( $self->is_cluster ) {
899 # if it's a cluster job, look for warnings from the resource
900 # manager in the error file and include those in the error output
901 my $pbs_warnings = '';
902 if( -f $self->err_file ) {
903 eval {
904 open my $e, $self->err_file or die "WARNING: $! opening err file ".$self->err_file;
905 while( <$e> ) {
906 next unless m|^\=\>\> PBS:|;
907 $pbs_warnings .= $_;
909 $pbs_warnings = __PACKAGE__.": resource manager output:\n$pbs_warnings" if $pbs_warnings;
911 $pbs_warnings .= $@ if $@;
913 # and also prepend the cluster job ID to aid troubleshooting
914 my $jobid = $self->job_id;
915 $error_string = __PACKAGE__.": cluster job id: $jobid\n"
916 . $pbs_warnings
917 . $error_string
918 . '==== '.__PACKAGE__." running qstat -f on this job ===========\n"
919 . `qstat -f '$jobid'`
920 . '==== '.__PACKAGE__." end qstat output =======================\n"
922 #kill our child process's whole group if it's still running for some reason
923 kill SIGKILL => -($self->pid) if $self->is_async;
924 $self->_error_string($error_string);
925 if($self->_raise_error && !($self->_told_to_die && $error_string =~ /Got signal SIG(INT|QUIT|TERM)/)) {
926 croak($error_string || 'subprocess died, but returned no error string');
931 # runs the completion hook(s) if present
932 sub _run_completion_hooks {
933 my $self = shift;
935 $self->_die_if_error; #if our child died, we should die too, not run the completion hooks
937 dbp 'running job completion hook';
939 #skip if we have no completion hooks or we have already run them
940 return unless $self->_on_completion && ! $self->_already_ran_completion_hooks;
942 #run the hooks
943 $_->($self,@_) for @{ $self->_on_completion };
945 #set flag saying we have run them
946 $self->_already_ran_completion_hooks(1);
949 sub _diefile_exists {
950 my ($self) = @_;
951 unless($self->is_cluster) {
952 return -e $self->_diefile_name;
953 } else {
954 #have to do the opendir dance instead of caching, because NFS caches the stats
955 opendir my $tempdir, $self->tempdir
956 or return 0;
957 while(my $f = readdir $tempdir) {
958 #dbp "is '$f' my diefile?\n";
959 return 1 if $f eq 'died';
961 return 0;
965 sub _file_contents {
966 my ($self,$file) = @_;
967 uncache($file) if $self->is_cluster;
968 local $/;
969 open my $f, $file or confess "$! reading $file";
970 return scalar <$f>;
973 #takes an error text string, adds some informative context to it,
974 #then returns the new string
975 sub _format_error_message {
976 my $self = shift;
977 my $error = shift || 'no error message';
978 $error =~ s/[\.,\s]*$//; #chop off any ending punctuation or whitespace
979 my $of = $self->out_file;
980 my $ef = $self->err_file;
981 my @out_tail = do {
982 unless(ref $of) {
983 "last few lines of stdout:\n",`tail -20 $of 2>&1`
984 } else {
985 ("(no stdout captured)")
988 my @err_tail = do {
989 unless(ref $ef) {
990 "last few lines of stderr:\n",`tail -20 $ef 2>&1`
991 } else {
992 ("(no stderr captured)")
995 return join '', map {chomp; __PACKAGE__.": $_\n"} (
996 "start time: ".( $self->start_time ? strftime('%Y-%m-%d %H:%M:%S %Z', localtime($self->start_time) ) : 'NOT RECORDED' ),
997 "error time: ".strftime('%Y-%m-%d %H:%M:%S %Z',localtime),
998 "command failed: '" . join(' ',@{$self->_command}) . "'",
999 $error,
1000 @out_tail,
1001 @err_tail,
1006 =head2 out_file
1008 Usage: my $file = $run->out_file;
1009 Desc : get the filename or filehandle that received the
1010 the stdout of the thing you just ran
1011 Ret : a filename, or a filehandle if you passed in a filehandle
1012 with the out_file option to run()
1013 Args : none
1015 =cut
1018 #out_file() is generated by Class::MethodMaker above
1021 =head2 out
1023 Usage: print "the program said: ".$run->out."\n";
1024 Desc : get the STDOUT output from the program as a string
1025 Be careful, this is in a tempfile originally, and if it's
1026 got a lot of stuff in it you'll blow your memory.
1027 Consider using out_file() instead.
1028 Ret : string containing the output of the program, or undef
1029 if you set our out_file to a filehandle
1030 Args : none
1032 =cut
1034 sub out {
1035 my ($self) = @_;
1036 unless(ref($self->out_file)) {
1037 $self->dbp("Outfile is ",$self->out_file,"\n");
1038 return $self->_file_contents($self->out_file);
1040 return undef;
1044 =head2 err_file
1046 Usage: my $err_filename = $run->err_file
1047 Desc : get the filename or filehandle that received
1048 the STDERR output
1049 Ret : a filename, or a filehandle if you passed in a filehandle
1050 with the err_file option to run()
1051 Args : none
1053 =cut
1056 #err_file() is generated by Class::MethodMaker above
1059 =head2 err
1061 Usage: print "the program errored with ".$run->err."\n";
1062 Desc : get the STDERR output from the program as a string
1063 Be careful, this is in a tempfile originally, and if it's
1064 too big you'll run out of memory.
1065 Consider using err_file() instead.
1066 Ret : string containing the program's STDERR output, or undef
1067 if you set your err_file to a filehandle
1068 Args : none
1070 =cut
1072 sub err {
1073 my ($self) = @_;
1074 unless(ref($self->err_file)) {
1075 return $self->_file_contents( $self->err_file );
1077 return undef;
1080 =head2 error_string
1082 Usage: my $err = $runner->error_string;
1083 Desc : get the string contents of the error
1084 we last die()ed with, if any.
1085 You would mostly want to check this
1086 if you did a run() with raise_error
1087 set to false
1088 Ret : undef if there has been no error,
1089 or a string if there has been.
1090 Args : none
1091 Side Effects: none
1093 =cut
1095 sub error_string {
1096 shift->_error_string;
1099 =head2 in_file
1101 Usage: my $infile_name = $run->in_file;
1102 Desc : get the filename or filehandle used for the process's stdin
1103 Ret : whatever you passed in the in_file option to run(), if anything.
1104 So that would be either a filename or a filehandle.
1105 Args : none
1107 =cut
1110 #in_file() is defined by Class::MethodMaker above
1113 =head2 working_dir
1115 Usage: my $dir = $run->working_dir
1116 Desc : get/set the full pathname (string) of the process's working dir.
1117 Defaults to the parent process's working directory.
1118 Ret : the current or new value of the working directory
1119 Args : (optional) new path for the working directory of this process
1120 Side Effects: gets/sets the working directory where the process is/will be
1121 running
1122 Note: attempting to set the working directory on a process that is currently
1123 running will throw an error
1125 =cut
1127 sub working_dir {
1128 my ($self,$newdir) = @_;
1130 if($newdir) {
1131 -d $newdir or croak "'$newdir' is not a directory";
1132 $self->alive and croak "cannot set the working dir on a running process";
1133 $self->_working_dir($newdir);
1136 $self->_working_dir(File::Spec->curdir) unless $self->_working_dir;
1137 return $self->_working_dir;
1140 =head2 is_async
1142 Usage: print "It was asynchronous" if $runner->is_async;
1143 Desc : tell whether this run was asynchronous (run_async or run_cluster)
1144 Ret : 1 if the run was asynchronous, 0 if not
1145 Args : none
1147 =cut
1149 sub is_async {
1150 my $self = shift;
1151 $self->{is_async} = shift if @_;
1152 return $self->{is_async};
1155 =head2 is_cluster
1157 Usage: print "It's a cluster job" if $runner->is_cluster;
1158 Desc : tell whether this run was done with a job submitted to the cluster
1159 Ret : 1 if it's a cluster job, 0 if not
1160 Args : none
1162 =cut
1164 sub is_cluster {
1165 my $self = shift;
1166 $self->{is_cluster} = shift if @_;
1167 return $self->{is_cluster};
1170 =head2 alive
1172 Usage: print "It's still there" if $runner->alive;
1173 Desc : check whether our background process is still alive
1174 Ret : false if it's not still running or was synchronous,
1175 true if it's async or cluster and is still running.
1176 Additionally, if it's a cluster job, the true value
1177 returned will be either 'ending', 'running' or 'queued'.
1178 Args : none
1179 Side Effects: dies if our background process terminated abnormally
1181 =cut
1183 sub alive {
1184 my ($self) = @_;
1186 $self->_die_if_error; #if our child died, we should die too
1188 if( $self->is_async) {
1189 #use a kill with signal zero to see if that pid is still running
1190 $self->_reap;
1191 if( kill 0 => $self->pid ) {
1192 system("pstree -p | egrep '$$|".$self->pid."'") if DEBUG;
1193 dbp 'background job '.$self->pid." is alive.\n";
1194 return 1;
1195 } else {
1196 system("pstree -p | egrep '$$|".$self->pid."'") if DEBUG;
1197 dbp 'background job ',$self->pid," is dead.\n";
1198 $self->_run_completion_hooks unless $self->_told_to_die;
1199 return;
1201 } elsif( $self->is_cluster ) {
1202 #use qstat to see if the job is still alive
1203 my %m = qw| e ending r running q queued |;
1204 my $state = $m{ $self->_qstat->{'job_state'} || '' };
1205 $self->_run_completion_hooks unless $state || $self->_told_to_die;
1206 return $state;
1208 $self->_die_if_error; #if our child died, we should die too
1209 return;
1212 sub _cluster_queue_jobs_count {
1213 my $cnt = scalar keys %{ shift->_global_qstat || {} };
1214 #print "jobs count: $cnt\n";
1215 return $cnt;
1218 sub _qstat {
1219 my ($self) = @_;
1220 my $jobs = $self->_global_qstat;
1221 return $jobs->{$self->_jobid} || {};
1224 #keep a cached copy of the qstat results, updated at most every MIN_QSTAT_WAIT
1225 #seconds, to avoid pestering the server too much
1227 use constant MIN_QSTAT_WAIT => 3;
1229 my $jobstate;
1230 my $last_qstat_time;
1231 sub _flush_qstat_cache {
1232 $last_qstat_time = 0;
1234 sub _global_qstat {
1235 my ($self,%opt) = @_;
1237 #return our cached job state if it has been updated recently
1238 unless( defined($last_qstat_time) && (time()-$last_qstat_time) <= MIN_QSTAT_WAIT ) {
1239 #otherwise, update it and return it
1240 $jobstate = {};
1241 my $servername = $self->_jobdest_isset ? $self->_jobdest : '';
1242 $servername =~ s/^[^@]+//;
1243 # warn "using server name $servername\n";
1244 open my $qstat, "qstat -f $servername 2>&1 |";
1245 my $current_jobid;
1246 while (my $qs = <$qstat>) {
1247 # dbp "got qstat record:\n$qs";
1248 if ($qs =~ /\s*Job\s+Id\s*:\s*(\S+)/i) {
1249 $current_jobid = $1;
1250 } elsif ( my ($key,$val) = $qs =~ /^\s*([^=\s]+)\s*=\s*(\S+)/ ) {
1251 next if $key =~ /[=:]/;
1252 $jobstate->{$current_jobid}->{lc($key)} = lc $val;
1253 } elsif ( $qs =~ /qstat: (.+)/ ) { #< probably some kind of error
1254 if( $opt{no_recurse} ) {
1255 warn $qs;
1256 warn $_ while <$qstat>;
1257 return {};
1258 } else {
1259 sleep 3; #< wait a bit and try a second time
1260 return $self->_global_qstat( no_recurse => 1 );
1264 $last_qstat_time = time();
1265 # use Data::Dumper;
1266 # warn "qstat hash is now: ".Dumper($jobstate);
1268 return $jobstate;
1272 =head2 wait
1274 Usage: my $status = $job->wait;
1275 Desc : this subroutine blocks until our job finishes
1276 of course, if the job was run synchronously,
1277 this will return immediately
1278 Ret : the exit status ($?) of the job
1279 Args : none
1281 =cut
1283 sub wait {
1284 my ($self) = @_;
1285 $self->_die_if_error;
1286 if($self->is_async && $self->alive) { #< for backgrounded jobs
1287 $self->_reap(1); #blocking wait
1288 } elsif($self->is_cluster && $self->alive) {#< for cluster jobs
1289 #spin wait for the cluster job to finish
1290 do { sleep 2; $self->_die_if_error; } while $self->alive;
1292 die 'sanity check failed, process is still alive' if $self->alive;
1293 $self->_die_if_error;
1294 return $self->exit_status;
1297 =head2 die
1299 Usage: die "Could not kill job!" unless $runner->die;
1300 Desc : Reliably try to kill the process, if it is being run
1301 in the background. The following signals are sent to
1302 the process at one second intervals until the process dies:
1303 HUP, QUIT, INT, KILL.
1304 Ret : 1 if the process no longer exists once die has completed, 0 otherwise.
1305 Will always return 1 if this process was not run in the background.
1306 Args : none
1307 Side Effects: tries really hard to kill our background process
1309 =cut
1311 sub die {
1312 my ($self) = @_;
1313 $self->_told_to_die(1);
1314 if($self->is_async) {
1315 $self->_reap; #reap if necessary
1316 my @signal_sequence = qw/SIGQUIT SIGINT SIGTERM SIGKILL/;
1317 foreach my $signal (@signal_sequence) {
1318 if(kill $signal => $self->pid) {
1319 dbp "DIE(".$self->pid.") OK with signal $signal";
1320 } else {
1321 dbp "DIE(".$self->pid.") failed with signal $signal";
1323 sleep 1;
1324 $self->_reap; #reap if necessary
1325 last unless $self->alive;
1327 $self->_reap; #reap if necessary
1328 return $self->alive ? 0 : 1;
1329 } elsif( $self->is_cluster ) {
1330 $self->_flush_qstat_cache; #< force a qstat update
1331 dbp "trying first run qdel ",$self->_jobid,"\n";
1332 if($self->alive) {
1333 my $jobid = $self->_jobid;
1334 my $qdel = `qdel $jobid 2>&1`;
1335 if ($self->alive) {
1336 sleep 3; #wait a bit longer
1337 if ($self->alive) { #try the del again
1338 dbp "trying again qdel ",$self->_jobid,"\n";
1339 $qdel = `qdel $jobid 2>&1`;
1340 sleep 7; #wait again for it to take effect
1341 if ($self->alive) {
1342 die("Unable to kill cluster job ".$self->_jobid.", qdel output: $qdel" );
1348 return 1;
1351 =head2 pid
1353 Usage: my $pid = $runner->pid
1354 Ret : the PID of our background process, or
1355 undef if this command was not run asynchronously
1356 Args : none
1357 Side Effects: none
1359 =cut
1361 sub pid { #just a read-only wrapper for _pid setter/getter
1362 shift->_pid;
1365 =head2 job_id
1367 Usage: my $jobid = $runner->job_id;
1368 Ret : the job ID of our cluster job if this was a cluster job, undef otherwise
1369 Args : none
1370 Side Effects: none
1372 =cut
1374 sub job_id {
1375 shift->_jobid;
1379 =head2 host
1381 Usage: my $host = $runner->host
1382 Desc : get the hostname of the host that ran or is running this job
1383 Ret : hostname, or undef if the job has not been run (yet)
1384 Args : none
1386 =cut
1388 sub host {
1389 my $self = shift;
1390 return $self->_host if $self->_host_isset;
1391 confess 'should have a hostname by now' unless $self->is_async || $self->is_cluster;
1392 $self->_read_status_file;
1393 return $self->_host;
1397 =head2 start_time
1399 Usage: my $start = $runner->start_time;
1400 Desc : get the number returned by time() for when this process
1401 was started
1402 Ret : result of time() for just before the process was started
1403 Args : none
1405 =cut
1407 sub start_time {
1408 my $self = shift;
1409 return $self->_start_time if $self->_start_time_isset;
1410 confess 'should have a start time by now' unless $self->is_async || $self->is_cluster;
1411 $self->_read_status_file;
1412 return $self->_start_time;
1415 =head2 end_time
1417 Usage: my $elapsed = $runner->end_time - $runner->start_time;
1418 Desc : get the number returned by time() for when this process was
1419 first noticed to have stopped.
1420 Ret : time()-type number
1421 Args : none
1423 This end time is approximate, since I haven't yet figured out a way
1424 to get an asynchronous notification when a process finishes that isn't
1425 necessarily a child of this process. So as a kludge, pretty much every
1426 method you call on this object checks whether the process has finished and
1427 sets the end time if it has.
1429 =cut
1431 sub end_time {
1432 my $self = shift;
1433 if($self->is_async) {
1434 $self->_reap;
1435 return undef if $self->alive;
1436 $self->_read_status_file;
1438 return $self->_end_time;
1440 sub _read_status_file {
1441 my $self = shift;
1443 return unless $self->is_async || $self->is_cluster; #this only applies to async and cluster jobs
1444 return if $self->_end_time_isset;
1446 my $statname = File::Spec->catfile( $self->tempdir, 'status');
1447 uncache($statname) if $self->is_cluster;
1448 dbp "attempting to open status file $statname\n";
1449 open my $statfile, $statname
1450 or return;
1451 my ($host,$start,$end,$ret);
1452 while(<$statfile>) {
1453 dbp $_;
1454 if( /^start:(\d+)/ ) {
1455 $start = $1;
1456 } elsif( /^end:(\d+)/) {
1457 $end = $1;
1458 } elsif( /^ret:(\d+)/) {
1459 $ret = $1;
1460 } elsif( /^host:(\S+)/) {
1461 $host = $1;
1462 } else {
1463 dbp "no match: $_";
1466 $self->_start_time($start);
1467 $self->_host($host);
1468 $self->_end_time($end) if defined $end;
1469 $self->_exit_status($ret) if defined $ret;
1472 =head2 exit_status
1474 Usage: my $status = $runner->exit_status
1475 Desc : get the exit status of the thing that just ran
1476 Ret : undef if the thing hasn't finished yet, otherwise,
1477 returns the exit status ($?) of the program.
1478 For how to handle this value, see perlvar.
1479 Args : none
1481 =cut
1483 sub exit_status {
1484 my $self = shift;
1485 return $self->_exit_status if $self->_exit_status_isset;
1486 $self->_read_status_file;
1487 return $self->_exit_status;
1490 =head2 cleanup
1492 Usage: $runner->cleanup;
1493 Desc : delete temp storage associated with this object, if any
1494 Ret : 1 on success, dies on failure
1495 Args : none
1496 Side Effects: deletes any temporary files or directories associated
1497 with this object
1500 Cleanup is done automatically for run() jobs, but not run_async()
1501 or run_cluster() jobs.
1503 =cut
1505 sub cleanup {
1506 my ($self) = @_;
1507 $self->_reap if $self->is_async;
1509 # assemble list of stem directories to try to delete (if they are
1510 # not empty)
1511 # WARNING THIS WORKS ONLY ON UNIX-STYLE PATHS RIGHT NOW
1513 my @delete_dirs;
1514 if( my $t = $self->{tempdir} ) {
1515 $t =~ s!/$!!; #< remove any trailing slash
1516 while( $t =~ s!/[^/]+$!! && $t !~ /cxgn-tools-run-tempfiles$/ ) {
1517 push @delete_dirs, $t;
1521 if( $self->{tempdir} && -d $self->{tempdir} ) {
1522 rmtree($self->{tempdir}, DEBUG ? 1 : 0);
1525 rmdir $_ foreach @delete_dirs;
1527 return 1;
1530 =head2 do_not_cleanup
1532 Usage: $runner->do_not_cleanup;
1533 Desc : get/set flag that disables automatic cleaning up of this
1534 object's tempfiles when it goes out of scope
1535 Args : true to set, false to unset
1536 Ret : current value of flag
1538 =cut
1540 sub do_not_cleanup {
1541 my ($self,$v) = @_;
1542 if(defined $v) {
1543 $self->{do_not_cleanup} = $v;
1545 $self->{do_not_cleanup} = 0 unless defined $self->{do_not_cleanup};
1546 return $self->{do_not_cleanup};
1549 =head2 property()
1551 Used to set key => values in the $self->{properties} namespace,
1552 for attaching custom properties to jobs
1554 Args: Key, Value (optional, to set key value)
1555 Ret: Value of Key
1556 Example: $job->property("file_written", 1);
1557 do_something() if $job->property("file_written");
1559 =cut
1561 sub property {
1562 my $self = shift;
1563 my $key = shift;
1564 return unless defined $key;
1565 my $value = shift;
1566 if(defined $value){
1567 $self->{properties}->{$key} = $value;
1569 return $self->{properties}->{$key};
1572 sub DESTROY {
1573 my $self = shift;
1574 $self->die if( $self->_die_on_destroy );
1575 $self->_reap if $self->is_async;
1576 if( $self->is_cluster ) {
1577 uncache($self->out_file) unless ref $self->out_file;
1578 uncache($self->err_file) unless ref $self->out_file;
1580 $self->cleanup unless $self->_existing_temp || $self->is_async || $self->is_cluster || $self->do_not_cleanup || DEBUG;
1583 sub _reap {
1584 my $self = shift;
1585 my $hang = shift() ? 0 : WNOHANG;
1586 if (my $res = waitpid($self->pid, $hang) > 0) {
1587 # We reaped a truly running process
1588 $self->_exit_status($?);
1589 dbp "reaped ".$self->pid;
1590 } else {
1591 dbp "reaper: waitpid(".$self->pid.",$hang) returned $res";
1595 =head1 SEE ALSO
1597 L<IPC::Run> - the big kahuna
1599 L<IPC::Run3> - this module uses CXGN::Tools::Run::Run3, which is
1600 basically a copy of this module in which the signal
1601 handling has been tweaked.
1603 L<Proc::Background> - this module sort of copies this
1605 L<Proc::Simple> - this module takes a lot of code from this
1607 L<Expect> - great for interacting with your subprocess
1610 This module blends ideas from the two CPAN modules L<Proc::Simple> and
1611 L<IPC::Run3>, though it does not directly use either of them. Rather,
1612 processes are run with L<CXGN::Tools::Run::Run3>, the code of which
1613 has been forked from IPC::Run3 version 0.030. The backgrounding is
1614 all handled in this module, in a way that was inspired by the way
1615 L<Proc::Simple> does things. The interface exported by this module is
1616 very similar to L<Proc::Background>, though the implementation is
1617 different.
1619 =head1 AUTHOR
1621 Robert Buels
1623 =cut
1627 package CXGN::Tools::Run::Run3;
1630 # =head1 NAME
1632 # CXGN::Tools::Run::Run3 - modified version of IPC::Run3 version 0.030,
1633 # used by L<CXGN::Tools::Run>. This module is really only intended for use by
1634 # L<CXGN::Tools::Run>.
1636 # =head1 SYNOPSIS
1638 # use CXGN::Tools::Run::Run3; # Exports run3() by default
1640 # run3 \@cmd, \$in, \$out, \$err;
1641 # run3 \@cmd, \@in, \&out, \$err;
1643 # =cut
1645 use strict;
1646 use constant debugging => $ENV{CXGNTOOLSRUNDEBUG} || 0;
1648 use Config;
1650 use Carp qw( croak );
1651 use File::Temp qw( tempfile );
1652 use POSIX qw( dup dup2 );
1654 # We cache the handles of our temp files in order to
1655 # keep from having to incur the (largish) overhead of File::Temp
1656 my %fh_cache;
1658 sub _spool_data_to_child {
1659 my ( $type, $source, $binmode_it ) = @_;
1661 # If undef (not \undef) passed, they want the child to inherit
1662 # the parent's STDIN.
1663 return undef unless defined $source;
1665 my $fh;
1666 if ( ! $type ) {
1667 local *FH; # Do this the backcompat way
1668 open FH, "<$source" or croak "$!: $source";
1669 $fh = *FH{IO};
1670 warn "run3(): feeding file '$source' to child STDIN\n"
1671 if debugging >= 2;
1672 } elsif ( $type eq "FH" ) {
1673 $fh = $source;
1674 warn "run3(): feeding filehandle '$source' to child STDIN\n"
1675 if debugging >= 2;
1676 } else {
1677 $fh = $fh_cache{in} ||= tempfile;
1678 truncate $fh, 0;
1679 seek $fh, 0, 0;
1680 my $seekit;
1681 if ( $type eq "SCALAR" ) {
1683 # When the run3()'s caller asks to feed an empty file
1684 # to the child's stdin, we want to pass a live file
1685 # descriptor to an empty file (like /dev/null) so that
1686 # they don't get surprised by invalid fd errors and get
1687 # normal EOF behaviors.
1688 return $fh unless defined $$source; # \undef passed
1690 warn "run3(): feeding SCALAR to child STDIN",
1691 debugging >= 3
1692 ? ( ": '", $$source, "' (", length $$source, " chars)" )
1693 : (),
1694 "\n"
1695 if debugging >= 2;
1697 $seekit = length $$source;
1698 print $fh $$source or die "$! writing to temp file";
1700 } elsif ( $type eq "ARRAY" ) {
1701 warn "run3(): feeding ARRAY to child STDIN",
1702 debugging >= 3 ? ( ": '", @$source, "'" ) : (),
1703 "\n"
1704 if debugging >= 2;
1706 print $fh @$source or die "$! writing to temp file";
1707 $seekit = grep length, @$source;
1708 } elsif ( $type eq "CODE" ) {
1709 warn "run3(): feeding output of CODE ref '$source' to child STDIN\n"
1710 if debugging >= 2;
1711 my $parms = []; # TODO: get these from $options
1712 while (1) {
1713 my $data = $source->( @$parms );
1714 last unless defined $data;
1715 print $fh $data or die "$! writing to temp file";
1716 $seekit = length $data;
1720 seek $fh, 0, 0 or croak "$! seeking on temp file for child's stdin"
1721 if $seekit;
1724 croak "run3() can't redirect $type to child stdin"
1725 unless defined $fh;
1727 return $fh;
1730 sub _fh_for_child_output {
1731 my ( $what, $type, $dest, $binmode_it ) = @_;
1733 my $fh;
1734 if ( $type eq "SCALAR" && $dest == \undef ) {
1735 warn "run3(): redirecting child $what to oblivion\n"
1736 if debugging >= 2;
1738 $fh = $fh_cache{nul} ||= do {
1739 local *FH;
1740 open FH, ">" . File::Spec->devnull;
1741 *FH{IO};
1743 } elsif ( $type eq "FH" ) {
1744 $fh = $dest;
1745 warn "run3(): redirecting $what to filehandle '$dest'\n"
1746 if debugging >= 3;
1747 } elsif ( !$type ) {
1748 warn "run3(): feeding child $what to file '$dest'\n"
1749 if debugging >= 2;
1751 local *FH;
1752 open FH, ">$dest" or croak "$!: $dest";
1753 $fh = *FH{IO};
1754 } else {
1755 warn "run3(): capturing child $what\n"
1756 if debugging >= 2;
1758 $fh = $fh_cache{$what} ||= tempfile;
1759 seek $fh, 0, 0;
1760 truncate $fh, 0;
1763 return $fh;
1766 sub _read_child_output_fh {
1767 my ( $what, $type, $dest, $fh, $options ) = @_;
1769 return if $type eq "SCALAR" && $dest == \undef;
1771 seek $fh, 0, 0 or croak "$! seeking on temp file for child $what";
1773 if ( $type eq "SCALAR" ) {
1774 warn "run3(): reading child $what to SCALAR\n"
1775 if debugging >= 3;
1777 # two read()s are used instead of 1 so that the first will be
1778 # logged even it reads 0 bytes; the second won't.
1779 my $count = read $fh, $$dest, 10_000;
1780 while (1) {
1781 croak "$! reading child $what from temp file"
1782 unless defined $count;
1784 last unless $count;
1786 warn "run3(): read $count bytes from child $what",
1787 debugging >= 3 ? ( ": '", substr( $$dest, -$count ), "'" ) : (),
1788 "\n"
1789 if debugging >= 2;
1791 $count = read $fh, $$dest, 10_000, length $$dest;
1793 } elsif ( $type eq "ARRAY" ) {
1794 @$dest = <$fh>;
1795 if ( debugging >= 2 ) {
1796 my $count = 0;
1797 $count += length for @$dest;
1798 warn
1799 "run3(): read ",
1800 scalar @$dest,
1801 " records, $count bytes from child $what",
1802 debugging >= 3 ? ( ": '", @$dest, "'" ) : (),
1803 "\n";
1805 } elsif ( $type eq "CODE" ) {
1806 warn "run3(): capturing child $what to CODE ref\n"
1807 if debugging >= 3;
1809 local $_;
1810 while ( <$fh> ) {
1811 warn
1812 "run3(): read ",
1813 length,
1814 " bytes from child $what",
1815 debugging >= 3 ? ( ": '", $_, "'" ) : (),
1816 "\n"
1817 if debugging >= 2;
1819 $dest->( $_ );
1821 } else {
1822 croak "run3() can't redirect child $what to a $type";
1827 sub _type {
1828 my ( $redir ) = @_;
1829 return "FH" if eval { $redir->isa("IO::Handle") };
1830 my $type = ref $redir;
1831 return $type eq "GLOB" ? "FH" : $type;
1834 sub _max_fd {
1835 my $fd = dup(0);
1836 POSIX::close $fd;
1837 return $fd;
1840 my $run_call_time;
1841 my $sys_call_time;
1842 my $sys_exit_time;
1844 sub run3 {
1846 my $options = @_ && ref $_[-1] eq "HASH" ? pop : {};
1848 my ( $cmd, $stdin, $stdout, $stderr, $tempdir ) = @_;
1850 print STDERR "run3(): running ",
1851 join( " ", map "'$_'", ref $cmd ? @$cmd : $cmd ),
1852 "\n"
1853 if debugging;
1855 if($tempdir) {
1856 open(my $statfile,">","$tempdir/status");
1857 print $statfile "start:",time,"\n";
1860 if ( ref $cmd ) {
1861 croak "run3(): empty command" unless @$cmd;
1862 croak "run3(): undefined command" unless defined $cmd->[0];
1863 croak "run3(): command name ('')" unless length $cmd->[0];
1864 } else {
1865 croak "run3(): missing command" unless @_;
1866 croak "run3(): undefined command" unless defined $cmd;
1867 croak "run3(): command ('')" unless length $cmd;
1870 my $in_type = _type $stdin;
1871 my $out_type = _type $stdout;
1872 my $err_type = _type $stderr;
1874 # This routine procedes in stages so that a failure in an early
1875 # stage prevents later stages from running, and thus from needing
1876 # cleanup.
1878 print STDERR "run3(): in_type=$in_type, out_type=$out_type, err_type=$err_type\n"
1879 if debugging;
1881 my $in_fh = _spool_data_to_child $in_type, $stdin,
1882 $options->{binmode_stdin} if defined $stdin;
1884 my $out_fh = _fh_for_child_output "stdout", $out_type, $stdout,
1885 $options->{binmode_stdout} if defined $stdout;
1887 my $tie_err_to_out =
1888 defined $stderr && defined $stdout && $stderr eq $stdout;
1890 my $err_fh = $tie_err_to_out
1891 ? $out_fh
1892 : _fh_for_child_output "stderr", $err_type, $stderr,
1893 $options->{binmode_stderr} if defined $stderr;
1895 # this should make perl close these on exceptions
1896 #local *STDIN_SAVE;
1897 local *STDOUT_SAVE;
1898 local *STDERR_SAVE;
1900 my $saved_fd0 = dup( 0 ) if defined $in_fh;
1902 # open STDIN_SAVE, "<&STDIN"# or croak "run3(): $! saving STDIN"
1903 # if defined $in_fh;
1904 open STDOUT_SAVE, ">&STDOUT" or croak "run3(): $! saving STDOUT"
1905 if defined $out_fh;
1906 open STDERR_SAVE, ">&STDERR" or croak "run3(): $! saving STDERR"
1907 if defined $err_fh;
1909 my $ok = eval {
1910 # The open() call here seems to not force fd 0 in some cases;
1911 # I ran in to trouble when using this in VCP, not sure why.
1912 # the dup2() seems to work.
1913 dup2( fileno $in_fh, 0 )
1914 # open STDIN, "<&=" . fileno $in_fh
1915 or croak "run3(): $! redirecting STDIN"
1916 if defined $in_fh;
1918 # close $in_fh or croak "$! closing STDIN temp file"
1919 # if ref $stdin;
1921 open STDOUT, ">&" . fileno $out_fh
1922 or croak "run3(): $! redirecting STDOUT"
1923 if defined $out_fh;
1925 open STDERR, ">&" . fileno $err_fh
1926 or croak "run3(): $! redirecting STDERR"
1927 if defined $err_fh;
1929 my $host = `hostname`;
1930 my ($user) = getpwuid( $< );
1931 chomp $host;
1932 my $cmd_pid;
1933 my $r = do {
1935 my $pid = fork;
1936 defined($pid) or die "Could not fork!";
1937 unless($pid) {
1938 if(ref $cmd) {
1939 exec { $cmd->[0] } @$cmd;
1940 warn "exec failed for cmd ".join(' ',@$cmd).": $!\n";
1941 } else {
1942 exec $cmd;
1943 warn "exec failed for cmd $cmd: $!\n";
1945 POSIX::_exit(-1); #call a HARD exit to avoid running any weird END blocks
1948 $cmd_pid = $pid;
1949 #forward 'stop!' signals to our child process, then heed them ourselves
1950 my $we_get_signal; #main screen turn on
1951 foreach my $sig (qw/ QUIT INT TERM KILL /) {
1952 $SIG{$sig} = sub { kill "SIG$sig" => $pid; $we_get_signal = $sig;};
1954 my $ret = waitpid($pid,0); #wait for child to finish
1955 if ($tempdir) {
1956 open(my $statfile,">>","$tempdir/status");
1957 print $statfile "end:",time,"\n";
1958 print $statfile "ret:$?\n";
1959 print $statfile "host:$host\n";
1961 die "Got signal SIG$we_get_signal\n" if $we_get_signal;
1962 #how are you gentlemen!
1963 $ret
1966 my $exval = $? >> 8;
1967 my $sig = $?&127;
1968 unless ( defined $r && $r != -1 && $exval == 0 && $sig == 0) {
1969 if ( debugging ) {
1970 my $err_fh = defined $err_fh ? \*STDERR_SAVE : \*STDERR;
1971 print $err_fh "run3(): system() error $!\n"
1974 my @signames = split / /,$Config{sig_name};
1975 die "Command failed on host '$host', user '$user', local monitor pid $$, cmd pid $cmd_pid, \$?=$?, exit value $exval, signal $signames[$sig] ($sig), \$r=$r, \$!='$!' (string could be spurious)\n";
1978 if ( debugging ) {
1979 my $err_fh = defined $err_fh ? \*STDERR_SAVE : \*STDERR;
1980 print $err_fh "run3(): \$? is $?, \$r is $r\n"
1984 my $x = $@;
1986 my @errs;
1988 if ( defined $saved_fd0 ) {
1989 dup2( $saved_fd0, 0 );
1990 POSIX::close( $saved_fd0 );
1993 # open STDIN, "<&STDIN_SAVE"# or push @errs, "run3(): $! restoring STDIN"
1994 # if defined $in_fh;
1995 open STDOUT, ">&STDOUT_SAVE" or push @errs, "run3(): $! restoring STDOUT"
1996 if defined $out_fh;
1997 open STDERR, ">&STDERR_SAVE" or push @errs, "run3(): $! restoring STDERR"
1998 if defined $err_fh;
2000 die join ", ", @errs,"\n" if @errs;
2002 die "$x\n" unless $ok;
2004 _read_child_output_fh "stdout", $out_type, $stdout, $out_fh, $options
2005 if defined $out_fh && $out_type && $out_type ne "FH";
2006 _read_child_output_fh "stderr", $err_type, $stderr, $err_fh, $options
2007 if defined $err_fh && $err_type && $err_type ne "FH" && !$tie_err_to_out;
2009 return 1;
2012 # =head1 AUTHORS
2014 # Barrie Slaymaker E<lt>C<barries@slaysys.com>E<gt>.
2016 # Ricardo SIGNES E<lt>C<rjbs@cpan.org>E<gt> performed some routine maintenance in
2017 # 2005, thanks to help from the following ticket and/or patch submitters: Jody
2018 # Belka, Roderich Schupp, David Morel, and anonymous others.
2020 # Robert Buels E<lt>C<rmb32@cornell.edu>E<gt> then gutted and lobotomized it
2021 # for his own nefarious purposes.
2023 # =cut
2026 1;#do not remove