1 package CXGN
::Tools
::Cluster
;
3 use constant DEBUG
=> $ENV{CLUSTER_DEBUG
};
6 use Time
::HiRes qw
/usleep/;
14 Base class for cluster programs, such as ModelPAUP, SignalP, and more!
18 my $proc = CXGN::Tools::Cluster::(Program)->new({
19 in => $input_filepath,
20 out => $output_filepath,
30 print STDERR
"\nDEBUG MODE\n" if DEBUG
;
35 Args: Argument hash reference, with
36 in => (optional) input file, if you are splitting one file.
37 If you don't use this, you should send an input file
38 as an argument to the submit() subroutine
39 out => output result file
40 host => cluster host name, defaults to "solanine"
41 job_wait => refresh time for calling qstat while spinning
49 my $self = bless {}, $class;
52 $self->cluster_host($args->{cluster_host
});
53 $self->job_wait($args->{job_wait
});
54 $self->tmp_base($args->{tmp_base
});
55 $self->infile($args->{in});
56 $self->outfile($args->{out
});
58 $self->cluster_host("solanine") unless $self->cluster_host();
59 $self->run_locally($args->{run_locally
});
60 $self->tmp_base(".") if($self->run_locally() && !$args->{tmp_base
});
61 $self->tmp_base("/data/shared/tmp") unless $self->tmp_base();
62 $self->job_wait(10) unless $self->job_wait();
69 This function should either
70 1) Use the infile, split it up, and submit all jobs, set cluster_outs() and jobs()
71 2) Take one input file, submit one job, push cluster-outfile and job onto the
72 cluster_outs() and jobs() arrays.
77 die "Override this function in a subclass"
82 Prevent submission to qsub from happening too quickly. Call $self->chill()
83 before submitting a job in subclasses
91 usleep
($msec); #4 per second, at most
96 Returns 1 if jobs are still running, 0 if all jobs are done (or no jobs exist)
102 my $job_array = $self->jobs();
104 foreach(@
$job_array){
105 $running = 1 if $_->alive();
112 Keeps checking <-> sleeping until all the jobs are
115 Args: (optional) wait time in seconds between qstat calls,
116 uses $self->job_wait() otherwise
123 my $wait_time = shift;
124 $wait_time ||= $self->job_wait();
125 print STDERR
"\nAll jobs submitted, now we wait...";
126 while($self->alive()){
128 print STDERR
"." if DEBUG
;
134 A handy little utility to get chunks of roughly equal size
135 Args: Total Size, Minimum # of Pieces, Maximum Piece Size
136 Ret: An array of integers, each one a piece size for a chunk
142 my ($total_size, $min_pieces, $max_piece_size) = @_;
145 my $first_size = ceil
($total_size / $min_pieces);
147 my $piece_size = $max_piece_size + 1;
148 if($first_size <= $max_piece_size){
149 $piece_size = $first_size;
152 $piece_size = $max_piece_size;
153 my $num_pieces = ceil
($total_size / $piece_size);
154 until($num_pieces >= $min_pieces){
155 $piece_size = int($piece_size * 0.5);
156 $num_pieces = ceil
($total_size / $piece_size);
161 until($sum >= $total_size){
162 my $remaining = $total_size - $sum;
163 if($remaining <= $piece_size){
164 push(@sizes, $remaining);
169 push(@sizes, $piece_size);
177 Concatenates all of the cluster_outs() into outfile()
183 my $outfiles = $self->cluster_outs();
184 open(WF
, ">" . $self->outfile())
185 or die "\nCan't open final write file: $!";
186 print STDERR
"\nConcatenating cluster outputs to final file";
189 print WF
$_ while(<RF
>);
196 =head2 push_job() and push_cluster_out()
198 *Push a job onto the jobs() array ref
199 *Push an output file on the cluster_outs() array ref
207 my $jobarray = $self->jobs();
208 push(@
$jobarray, $job);
209 $self->jobs($jobarray); #I don't need to do this, do I?
212 sub push_cluster_out
{
214 my $cluster_out = shift;
215 return unless $cluster_out;
216 my $array = $self->cluster_outs();
217 push(@
$array, $cluster_out);
218 $self->cluster_outs($array); #I don't need to do this, do I?
221 =head2 Getter/Setters
223 jobs() - an array reference of the jobs returned by CXGN::Tools::Run
224 outfile() - the final output file of the process, usually concatenated
225 from the cluster outputs
226 infile() - the original input file for the process
227 cluster_outs() - array reference to cluster output files, as you choose
228 them to be. Standard concat() function takes these
229 and glues them together into the outfile()
230 temp_dir() - the temporary directory where all the cluster outputs and
231 cluster process information is stored. Usually a subdirectory
232 of /data/shared/tmp, but whatever you want it to be
233 tmp_base() - base directory for temporary files, use this to build temp_dir(),
234 defaults to "/data/shared/tmp"
235 cluster_host() - the name of the cluster server, defaults to "solanine"
236 job_wait - seconds to wait before checking qstat again, defaults to 10
237 run_locally - flag to run process locally instead of on cluster (say whaaat?)
238 this can be implemented in subclasses however you like
246 if($jobs && ref($jobs) eq "ARRAY"){
247 $self->{jobs
} = $jobs;
249 return $self->{jobs
};
256 $self->{outfile
} = $outfile;
258 return $self->{outfile
};
265 $self->{stdout
} = $stdout;
267 return $self->{stdout
};
274 $self->{stderr
} = $stderr;
276 return $self->{stderr
};
283 $self->{infile
} = $infile;
285 return $self->{infile
};
290 my $cluster_outs = shift;
291 if($cluster_outs && ref($cluster_outs) eq "ARRAY"){
292 $self->{cluster_outs
} = $cluster_outs;
294 return $self->{cluster_outs
};
299 my $temp_dir = shift;
301 $self->{temp_dir
} = $temp_dir;
303 return $self->{temp_dir
};
308 my $cluster_host = shift;
309 $self->{cluster_host
} = $cluster_host if $cluster_host;
310 return $self->{cluster_host
};
315 my $job_wait = shift;
316 $self->{job_wait
} = $job_wait if $job_wait;
317 return $self->{job_wait
};
322 my $tmp_base = shift;
323 $self->{tmp_base
} = $tmp_base if $tmp_base;
324 return $self->{tmp_base
};
330 return $self->{run_locally
} unless defined $flag;
331 $self->{run_locally
} = $flag;