Merge pull request #5051 from solgenomics/topic/locus_owner
[sgn.git] / lib / solGS / JobSubmission.pm
blobb7a7b529a104d275a577193f861afd4817b6824c
1 package solGS::JobSubmission;
3 use Moose;
4 use namespace::autoclean;
6 use CXGN::Tools::Run;
7 use Scalar::Util qw /weaken reftype/;
8 use Storable qw/ nstore retrieve /;
9 use solGS::queryJobs;
11 with 'MooseX::Getopt';
12 with 'MooseX::Runnable';
14 has "prerequisite_jobs" => (
15 is => 'ro',
16 isa => 'Str',
19 has "dependent_jobs" => (
20 is => 'ro',
21 isa => 'Str',
22 required => 1,
25 has "analysis_report_job" => (
26 is => 'ro',
27 isa => 'Str',
30 has "config_file" => (
31 is => 'ro',
32 isa => 'Str',
36 sub run {
37 my $self = shift;
38 my $secs = 30; #60 * 4;
40 my $pre_jobs = $self->run_prerequisite_jobs();
41 sleep($secs);
42 print STDERR
43 "\nCompleted prerequisite jobs. After waiting $secs sec...Now running the set of dependent jobs...\n";
45 my $dep_jobs = $self->run_dependent_jobs();
46 sleep($secs);
47 print STDERR
48 "\nCompleted dependent jobs. After waiting $secs sec...Now checking results and emailing the results...\n";
50 $self->send_analysis_report();
51 print STDERR "\nGot done checking results and emailing the results...\n";
55 sub run_prerequisite_jobs {
56 my $self = shift;
58 my $remaining_jobs;
59 my $pre_jobs = $self->prerequisite_jobs;
60 if ( $pre_jobs !~ /none/ ) {
61 $pre_jobs = retrieve($pre_jobs);
62 my $type = reftype $pre_jobs;
64 if ( reftype $pre_jobs eq 'HASH' ) {
66 my $submitted_priority_jobs;
67 foreach my $rank ( sort keys %$pre_jobs ) {
68 my $js = $pre_jobs->{$rank};
70 $submitted_priority_jobs = $self->submit_jobs($js);
72 $remaining_jobs = $self->wait_till_jobs_end($submitted_priority_jobs);
74 else {
75 if ( reftype $pre_jobs eq 'SCALAR' ) {
76 $pre_jobs = [$pre_jobs];
79 my $submitted_jobs = $self->submit_jobs($pre_jobs);
81 $remaining_jobs = $self->wait_till_jobs_end($submitted_jobs);
82 print STDERR "\nremaining jobs: $remaining_jobs\n";
87 return $remaining_jobs;
91 sub wait_till_jobs_end {
92 my ( $self, $jobs, $sleep_time ) = @_;
94 $sleep_time = 30 if !$sleep_time;
95 while (@$jobs) {
96 for ( my $i = 0 ; $i < scalar(@$jobs) ; $i++ ) {
97 splice( @$jobs, $i, 1 ) if !$jobs->[$i]->alive();
100 sleep $sleep_time;
104 my $remaining_jobs = $jobs ? $jobs->[0] : 0;
105 return $remaining_jobs;
108 sub submit_jobs {
109 my ( $self, $jobs ) = @_;
111 my @submitted_jobs;
113 if ( $jobs->[0] ) {
114 foreach my $job (@$jobs) {
115 my $submitted_job = $self->submit_job($job);
116 push @submitted_jobs, $submitted_job;
120 return \@submitted_jobs;
123 sub run_dependent_jobs {
124 my $self = shift;
126 my $jobs_file = $self->dependent_jobs;
127 my $dep_jobs = retrieve($jobs_file);
129 if ( reftype $dep_jobs ne 'ARRAY' ) {
130 $dep_jobs = [$dep_jobs];
133 my $submitted_jobs = $self->submit_jobs($dep_jobs);
135 my $remaining_jobs = $self->wait_till_jobs_end($submitted_jobs);
136 print STDERR "\nremaining jobs: $remaining_jobs\n";
137 return $remaining_jobs;
141 sub send_analysis_report {
142 my $self = shift;
144 my $report_file = $self->analysis_report_job;
145 unless ( $report_file =~ /none/ ) {
146 my $report_job = retrieve($report_file);
147 my $job = $self->submit_job($report_job);
148 return $job;
153 sub submit_job {
154 my ( $self, $args ) = @_;
156 my $job;
157 ###my $config = $self->config_file;
158 ###$config = retrieve($config);
160 print STDERR "submitting job... $args->{cmd}\n";
162 eval {
163 $job = CXGN::Tools::Run->new( $args->{config} );
164 $job->do_not_cleanup(1);
166 $job->is_cluster(1);
167 $job->run_cluster( $args->{cmd} );
169 if ( !$args->{background_job} ) {
170 print STDERR "\n WAITING job to finish\n";
171 $job->wait();
172 print STDERR "\n job COMPLETED\n";
176 if ($@) {
177 print STDERR "An error occurred submitting job $args->{cmd} \n$@\n";
180 return $job;
184 __PACKAGE__->meta->make_immutable;
186 ####
187 1; #
188 ####