a class to extract sequences from the genome
[cxgn-corelibs.git] / lib / CXGN / Tools / Cluster.pm
blob4313e0945570825df38dd6be6644ae046a5ed511
1 package CXGN::Tools::Cluster;
2 use strict;
3 use constant DEBUG => $ENV{CLUSTER_DEBUG};
4 use POSIX qw/ ceil /;
6 use Time::HiRes qw/usleep/;
8 =head1 NAME
10 CXGN::Tools::Cluster
12 =head1 SYNOPSIS
14 Base class for cluster programs, such as ModelPAUP, SignalP, and more!
16 =head1 USAGE
18 my $proc = CXGN::Tools::Cluster::(Program)->new({
19 in => $input_filepath,
20 out => $output_filepath,
21 host => "solanine",
22 job_wait => 10 });
23 $proc->submit();
24 $proc->spin();
25 #Done!
27 =cut
29 BEGIN {
30 print STDERR "\nDEBUG MODE\n" if DEBUG;
33 =head2 new()
35 Args: Argument hash reference, with
36 in => (optional) input file, if you are splitting one file.
37 If you don't use this, you should send an input file
38 as an argument to the submit() subroutine
39 out => output result file
40 host => cluster host name, defaults to "solanine"
41 job_wait => refresh time for calling qstat while spinning
43 Ret: A cluster object
45 =cut
47 sub new {
48 my $class = shift;
49 my $self = bless {}, $class;
50 my $args = shift;
52 $self->cluster_host($args->{cluster_host});
53 $self->job_wait($args->{job_wait});
54 $self->tmp_base($args->{tmp_base});
55 $self->infile($args->{in});
56 $self->outfile($args->{out});
58 $self->cluster_host("solanine") unless $self->cluster_host();
59 $self->run_locally($args->{run_locally});
60 $self->tmp_base(".") if($self->run_locally() && !$args->{tmp_base});
61 $self->tmp_base("/data/shared/tmp") unless $self->tmp_base();
62 $self->job_wait(10) unless $self->job_wait();
64 return $self;
67 =head2 submit()
69 This function should either
70 1) Use the infile, split it up, and submit all jobs, set cluster_outs() and jobs()
71 2) Take one input file, submit one job, push cluster-outfile and job onto the
72 cluster_outs() and jobs() arrays.
74 =cut
76 sub submit {
77 die "Override this function in a subclass"
80 =head2 chill()
82 Prevent submission to qsub from happening too quickly. Call $self->chill()
83 before submitting a job in subclasses
85 =cut
87 sub chill {
88 my $self = shift;
89 my $msec = shift;
90 $msec ||= 250_000;
91 usleep($msec); #4 per second, at most
94 =head2 alive()
96 Returns 1 if jobs are still running, 0 if all jobs are done (or no jobs exist)
98 =cut
100 sub alive {
101 my $self = shift;
102 my $job_array = $self->jobs();
103 my $running = 0;
104 foreach(@$job_array){
105 $running = 1 if $_->alive();
107 return $running;
110 =head2 spin()
112 Keeps checking <-> sleeping until all the jobs are
113 no longer alive.
115 Args: (optional) wait time in seconds between qstat calls,
116 uses $self->job_wait() otherwise
117 Ret: Nothing
119 =cut
121 sub spin {
122 my $self = shift;
123 my $wait_time = shift;
124 $wait_time ||= $self->job_wait();
125 print STDERR "\nAll jobs submitted, now we wait...";
126 while($self->alive()){
127 sleep($wait_time);
128 print STDERR "." if DEBUG;
132 =head2 job_sizes()
134 A handy little utility to get chunks of roughly equal size
135 Args: Total Size, Minimum # of Pieces, Maximum Piece Size
136 Ret: An array of integers, each one a piece size for a chunk
138 =cut
140 sub job_sizes {
141 my $self = shift;
142 my ($total_size, $min_pieces, $max_piece_size) = @_;
144 my @sizes = ();
145 my $first_size = ceil($total_size / $min_pieces);
147 my $piece_size = $max_piece_size + 1;
148 if($first_size <= $max_piece_size){
149 $piece_size = $first_size;
151 else{
152 $piece_size = $max_piece_size;
153 my $num_pieces = ceil($total_size / $piece_size);
154 until($num_pieces >= $min_pieces){
155 $piece_size = int($piece_size * 0.5);
156 $num_pieces = ceil($total_size / $piece_size);
159 my $sum = 0;
160 my $remaining = 0;
161 until($sum >= $total_size){
162 my $remaining = $total_size - $sum;
163 if($remaining <= $piece_size){
164 push(@sizes, $remaining);
165 last;
167 else {
168 $sum += $piece_size;
169 push(@sizes, $piece_size);
172 return @sizes;
175 =head2 concat()
177 Concatenates all of the cluster_outs() into outfile()
179 =cut
181 sub concat {
182 my $self = shift;
183 my $outfiles = $self->cluster_outs();
184 open(WF, ">" . $self->outfile())
185 or die "\nCan't open final write file: $!";
186 print STDERR "\nConcatenating cluster outputs to final file";
187 foreach(@$outfiles){
188 open(RF, $_);
189 print WF $_ while(<RF>);
190 close(RF);
191 print STDERR ".";
193 close(WF);
196 =head2 push_job() and push_cluster_out()
198 *Push a job onto the jobs() array ref
199 *Push an output file on the cluster_outs() array ref
201 =cut
203 sub push_job {
204 my $self = shift;
205 my $job = shift;
206 return unless $job;
207 my $jobarray = $self->jobs();
208 push(@$jobarray, $job);
209 $self->jobs($jobarray); #I don't need to do this, do I?
212 sub push_cluster_out {
213 my $self = shift;
214 my $cluster_out = shift;
215 return unless $cluster_out;
216 my $array = $self->cluster_outs();
217 push(@$array, $cluster_out);
218 $self->cluster_outs($array); #I don't need to do this, do I?
221 =head2 Getter/Setters
223 jobs() - an array reference of the jobs returned by CXGN::Tools::Run
224 outfile() - the final output file of the process, usually concatenated
225 from the cluster outputs
226 infile() - the original input file for the process
227 cluster_outs() - array reference to cluster output files, as you choose
228 them to be. Standard concat() function takes these
229 and glues them together into the outfile()
230 temp_dir() - the temporary directory where all the cluster outputs and
231 cluster process information is stored. Usually a subdirectory
232 of /data/shared/tmp, but whatever you want it to be
233 tmp_base() - base directory for temporary files, use this to build temp_dir(),
234 defaults to "/data/shared/tmp"
235 cluster_host() - the name of the cluster server, defaults to "solanine"
236 job_wait - seconds to wait before checking qstat again, defaults to 10
237 run_locally - flag to run process locally instead of on cluster (say whaaat?)
238 this can be implemented in subclasses however you like
241 =cut
243 sub jobs {
244 my $self = shift;
245 my $jobs = shift;
246 if($jobs && ref($jobs) eq "ARRAY"){
247 $self->{jobs} = $jobs;
249 return $self->{jobs};
252 sub outfile {
253 my $self = shift;
254 my $outfile = shift;
255 if($outfile) {
256 $self->{outfile} = $outfile;
258 return $self->{outfile};
261 sub stdout {
262 my $self = shift;
263 my $stdout = shift;
264 if($stdout) {
265 $self->{stdout} = $stdout;
267 return $self->{stdout};
270 sub stderr {
271 my $self = shift;
272 my $stderr = shift;
273 if($stderr) {
274 $self->{stderr} = $stderr;
276 return $self->{stderr};
279 sub infile {
280 my $self = shift;
281 my $infile = shift;
282 if($infile) {
283 $self->{infile} = $infile;
285 return $self->{infile};
288 sub cluster_outs {
289 my $self = shift;
290 my $cluster_outs = shift;
291 if($cluster_outs && ref($cluster_outs) eq "ARRAY"){
292 $self->{cluster_outs} = $cluster_outs;
294 return $self->{cluster_outs};
297 sub temp_dir {
298 my $self = shift;
299 my $temp_dir = shift;
300 if($temp_dir){
301 $self->{temp_dir} = $temp_dir;
303 return $self->{temp_dir};
306 sub cluster_host {
307 my $self = shift;
308 my $cluster_host = shift;
309 $self->{cluster_host} = $cluster_host if $cluster_host;
310 return $self->{cluster_host};
313 sub job_wait {
314 my $self = shift;
315 my $job_wait = shift;
316 $self->{job_wait} = $job_wait if $job_wait;
317 return $self->{job_wait};
320 sub tmp_base {
321 my $self = shift;
322 my $tmp_base = shift;
323 $self->{tmp_base} = $tmp_base if $tmp_base;
324 return $self->{tmp_base};
327 sub run_locally {
328 my $self = shift;
329 my $flag = shift;
330 return $self->{run_locally} unless defined $flag;
331 $self->{run_locally} = $flag;