From 02fb36e55fc91a250ab5b8fe7edb764e56be66ff Mon Sep 17 00:00:00 2001 From: pontus_pih Date: Fri, 2 Mar 2007 15:46:38 +0000 Subject: [PATCH] Added initial SGE support --- diagrams/modelfit.dia | 184 ++++++++++--- diagrams/tool.dia | 91 +++--- lib/common_options.pm | 1 + lib/tool/modelfit_subs.pm | 683 ++++++++++++++++++++++++++++++---------------- 4 files changed, 636 insertions(+), 323 deletions(-) diff --git a/diagrams/modelfit.dia b/diagrams/modelfit.dia index 9aac0ca..b35e6bf 100644 --- a/diagrams/modelfit.dia +++ b/diagrams/modelfit.dia @@ -85,9 +85,6 @@ - - - ## @@ -100,13 +97,13 @@ - + - + @@ -144,12 +141,6 @@ - - - - - - @@ -163,21 +154,21 @@ - + - + - + - + - + - + @@ -202,18 +193,18 @@ - + - + - - + + @@ -223,9 +214,6 @@ - - - @@ -245,16 +233,16 @@ - + - + - + #modelfit# @@ -289,12 +277,6 @@ - - - - - - @@ -311,7 +293,7 @@ - + @@ -320,9 +302,9 @@ - + - + @@ -2662,6 +2644,136 @@ + + + #sge_submit# + + + ## + + + ## + + + + + + ## + + + + + + + + + + + + + + + + + #jobId# + + + #scalar integer# + + + ## + + + ## + + + + + + + + #model# + + + #object model# + + + ## + + + ## + + + + + + + + #nm_version# + + + #scalar integer# + + + ## + + + ## + + + + + + + + + + #sge_monitor# + + + ## + + + ## + + + + + + ## + + + + + + + + + + + + + + + + + #jobId# + + + #scalar integer# + + + ## + + + ## + + + + + + + diff --git a/diagrams/tool.dia b/diagrams/tool.dia index e09cb56..95cedfb 100644 --- a/diagrams/tool.dia +++ b/diagrams/tool.dia @@ -68,16 +68,16 @@ - + - + - + #tool# @@ -112,12 +112,6 @@ - - - - - - @@ -134,7 +128,7 @@ - + @@ -143,9 +137,9 @@ - + - + @@ -1958,9 +1952,7 @@ - - - + #run_on_umbrella# @@ -2075,6 +2067,29 @@ + + + #run_on_sge# + + + #scalar boolean# + + + #0# + + + ## + + + + + + + + + + + @@ -3154,13 +3169,13 @@ - + - + @@ -3198,12 +3213,6 @@ - - - - - - @@ -3220,7 +3229,7 @@ - + @@ -3229,9 +3238,9 @@ - + - + @@ -3256,19 +3265,19 @@ - + - + - + - - + + @@ -3299,9 +3308,6 @@ - - - @@ -3316,9 +3322,6 @@ - - - @@ -3326,19 +3329,19 @@ - + - + - - + + @@ -3369,9 +3372,6 @@ - - - @@ -3386,9 +3386,6 @@ - - - diff --git a/lib/common_options.pm b/lib/common_options.pm index 9917217..ea7e927 100644 --- a/lib/common_options.pm +++ b/lib/common_options.pm @@ -52,6 +52,7 @@ my @tool_options = ( "abort_on_fail", "run_on_lsf", "run_on_ud", "run_on_umbrella", + "run_on_sge", "seed:s", "shrinkage", "significant_digits_rerun:f", diff --git a/lib/tool/modelfit_subs.pm b/lib/tool/modelfit_subs.pm index 05cf75a..70ee3b3 100644 --- a/lib/tool/modelfit_subs.pm +++ b/lib/tool/modelfit_subs.pm @@ -19,13 +19,13 @@ start include statements require hotkey; } use nonmem; - use IPC::Open3; use POSIX ":sys_wait_h"; use output; use OSspecific; use ui; use moshog_client; use Time::HiRes; + use ext::IPC::Run3; } end include statements @@ -339,7 +339,8 @@ start new if( $this -> {'run_on_lsf'} or $this -> {'run_on_ud'} or - $this -> {'run_on_umbrella'} ){ + $this -> {'run_on_umbrella'} or + $this -> {'run_on_sge'} ){ $this -> {'run_local'} = 0; } } @@ -348,7 +349,7 @@ end new # }}} new -# {{{ register_in_database5B +# {{{ register_in_database start register_in_database { @@ -597,9 +598,11 @@ start prepare_raw_results $self -> {'models'} -> [$i] -> {'outputs'} -> [0] -> flush; map( $pushed_rows += $_, @probs ); } else { - # Model not parsed successfully. + # Output not parsed successfully. my $row = $pushed_rows+1; - push( @{$self -> {'raw_results'} -> [$row]}, 'run failed' ); + push( @{$self -> {'raw_results'} -> [$pushed_rows++]}, ($i+1). + ",run failed - Could not parse the output file: ". + $self -> {'models'} -> [$i] -> {'outputs'} -> [0] -> filename ); } } $raw_line_structure -> write( 'raw_file_structure' ); @@ -617,8 +620,16 @@ start prepare_raw_results my $success; foreach my $param ( @params ) { if ( $name eq $param ){ - for ( my $i = 1; $i <= $max_hash{ $name }; $i++ ) { - push ( @new_header, uc(substr($name,0,2)).$i ); + if ( $name eq 'shrinkage_etas' ){ + for ( my $i = 1; $i <= $max_hash{ $name }; $i++ ) { + push ( @new_header, 'shrinkage_eta'.$i ); + } + } elsif ( $name eq 'shrinkage_wres' ){ + push ( @new_header, 'shrinkage_wres' ); + } else { + for ( my $i = 1; $i <= $max_hash{ $name }; $i++ ) { + push ( @new_header, uc(substr($name,0,2)).$i ); + } } $success = 1; last; @@ -680,8 +691,11 @@ start print_raw_results my $append = $self -> {'raw_results_append'} ? '>>' : '>'; open( RRES, $append.$dir.$file ); for ( my $i = 0; $i < scalar @{$self -> {'raw_nonp_results'}}; $i++ ) { - my @result_array = @{$self -> {'raw_nonp_results'} -> [$i]}; - map( $_ = defined $_ ? $_ : $PsN::out_miss_data, @result_array ); + my @result_array; + if ( defined $self -> {'raw_nonp_results'} -> [$i] ) { + @result_array = @{$self -> {'raw_nonp_results'} -> [$i]}; + map( $_ = defined $_ ? $_ : $PsN::out_miss_data, @result_array ); + } print RRES join(',',@result_array ),"\n"; } close( RRES ); @@ -785,6 +799,7 @@ start copy_model_and_input data_file_names => \@new_data_names, extra_data_file_names => \@new_extra_data_names, copy_data => 1 ); + $candidate_model -> register_in_database; $model -> flush_data; @@ -796,8 +811,7 @@ start copy_model_and_input $candidate_model -> add_extra_data_code; $candidate_model -> write_get_subs; $candidate_model -> write_readers; - $candidate_model -> _write( filename => 'psn.mod', - write_data => 1 ); #Kolla denna, den funkar inte utan wrap!! + $candidate_model -> _write( filename => 'psn.mod' );# write_data => 1 ); #Kolla denna, den funkar inte utan wrap!! $candidate_model -> flush_data; $candidate_model -> store_inits; @@ -1320,187 +1334,280 @@ end ud_submit # }}} -# {{{ ud_monitor_retrieve +# {{{ ud_monitor -start ud_monitor_retrieve +start ud_monitor { - unless( $self -> {'ud_native_retrieve'} ){ - my $script; - unless( defined $PsN::config -> {'_'} -> {'ud_nonmem'} ){ - if( $Config{osname} eq 'MSWin32' ) { - $script = 'nonmem.bat'; - } else { - $script = 'nonmem.sh'; - } +# unless( $self -> {'ud_native_retrieve'} ){ + + my $script; + unless( defined $PsN::config -> {'_'} -> {'ud_nonmem'} ){ + if( $Config{osname} eq 'MSWin32' ) { + $script = 'nonmem.bat'; } else { - $script = $PsN::config -> {'_'} -> {'ud_nonmem'}; - } - - if( system("$script -b -c -d . -r $jobId > nonmem_bat_stdout") ){ - 'debug' -> die( message => "UD submit script failed.\nSystem error message:$!" ); - } - - if( $Config{osname} ne 'MSWin32' ){ - cp('psn.LST','psn.lst'); - unlink('psn.LST'); + $script = 'nonmem.sh'; } - } else { + $script = $PsN::config -> {'_'} -> {'ud_nonmem'}; + } + + + my $stdout; # Variable to store output from "nonmem.bat" + + unless( run3( "$script -l " . $jobId, undef, \$stdout ) ){ # run3 will capture the output + 'debug' -> die( message => "UD submit script failed, check that $script is in your PATH.\nSystem error message: $!" ); + } + my @output_lines = split( /\n/, $stdout ); # I'm splitting the output into an array for easier handling. + debug -> warn( level => 2, + message => "$stdout" ); + foreach my $line( @output_lines ){ # loop over lines + if( $line =~ /Job State:\s+Completed/ ){ # regexp to find finished jobs. + debug -> warn( level => 1, + message => "Returning $jobId" ); + return $jobId; # Return the jobId found. + } + } + + return 0; + +# {{{ ud_native_retrieve + +# } else { # ud_native_retrieve - require SOAP::Lite; # MGSI SOAP interface - require LWP::UserAgent; # MGSI Fileservice interface +# require SOAP::Lite; # MGSI SOAP interface +# require LWP::UserAgent; # MGSI Fileservice interface - my %confparams; +# my %confparams; - # MGSI constants - uduserconf_read_file("./uduserconf", $ENV{HOME} ? ("$ENV{HOME}/uduserconf", "$ENV{HOME}/.uduserconf") : ()); - my $mgsisoapurl = $confparams{MGSI_SOAP_URL}; - my $mgsifilesvr = $confparams{MGSI_FILESVR_URL}; - my $mgsiuser = $confparams{MGSI_USERNAME}; - my $mgsipwd = $confparams{MGSI_PASSWORD}; +# # MGSI constants +# uduserconf_read_file("./uduserconf", $ENV{HOME} ? ("$ENV{HOME}/uduserconf", "$ENV{HOME}/.uduserconf") : ()); +# my $mgsisoapurl = $confparams{MGSI_SOAP_URL}; +# my $mgsifilesvr = $confparams{MGSI_FILESVR_URL}; +# my $mgsiuser = $confparams{MGSI_USERNAME}; +# my $mgsipwd = $confparams{MGSI_PASSWORD}; - # MGSI objects - my $server = new SOAP::Lite - -> uri('urn://ud.com/mgsi') - -> proxy($mgsisoapurl); - my $ua = new LWP::UserAgent; # mgsi filesvr HTTP object +# # MGSI objects +# my $server = new SOAP::Lite +# -> uri('urn://ud.com/mgsi') +# -> proxy($mgsisoapurl); +# my $ua = new LWP::UserAgent; # mgsi filesvr HTTP object - ############################## - ## LOG IN TO MGSI SERVER ## - ############################## - my $auth = soapvalidate($server->login($mgsiuser, $mgsipwd)); # mgsi authentication token +# ############################## +# ## LOG IN TO MGSI SERVER ## +# ############################## +# my $auth = soapvalidate($server->login($mgsiuser, $mgsipwd)); # mgsi authentication token - ################################ - ## RETRIEVE JOB INFORMATION ## - ################################ - my $job = soapvalidate($server->getJobById($auth, $jobId)); +# ################################ +# ## RETRIEVE JOB INFORMATION ## +# ################################ +# my $job = soapvalidate($server->getJobById($auth, $jobId)); - my $jobstep; - my $workunits; - do { - my $jobsteps = soapvalidate($server->getJobStepsByJob($auth, $$job{job_gid})); +# my $jobstep; +# my $workunits; +# do { +# my $jobsteps = soapvalidate($server->getJobStepsByJob($auth, $$job{job_gid})); - $jobstep = @$jobsteps[1]; # this job only has one jobstep. - my $workunits_array = soapvalidate($server->getWorkunits($auth, - {job_step_gid_match => [$$jobstep{job_step_gid}]}, - "", 0, -1)); # no ordering, start at record 0, give me all wus - $workunits = $$workunits_array{records}; - my $output_file = $$job{description}; +# $jobstep = @$jobsteps[1]; # this job only has one jobstep. +# my $workunits_array = soapvalidate($server->getWorkunits($auth, +# {job_step_gid_match => [$$jobstep{job_step_gid}]}, +# "", 0, -1)); # no ordering, start at record 0, give me all wus +# $workunits = $$workunits_array{records}; +# my $output_file = $$job{description}; - #print "job $$job{job_gid}; jobstep $$jobstep{job_step_gid}\n" if $mgsidebug; - #print "jobstep state is $$jobstep{state_id}\n" if $mgsidebug; +# #print "job $$job{job_gid}; jobstep $$jobstep{job_step_gid}\n" if $mgsidebug; +# #print "jobstep state is $$jobstep{state_id}\n" if $mgsidebug; - my $jobstepstatus = soapvalidate($server->getJobStepStatus($auth, $$jobstep{job_step_gid})); +# my $jobstepstatus = soapvalidate($server->getJobStepStatus($auth, $$jobstep{job_step_gid})); - sleep($self -> {'ud_sleep'}); - } while( $$jobstep{state_id} != 3 ); +# sleep($self -> {'ud_sleep'}); +# } while( $$jobstep{state_id} != 3 ); - # Retrieve all results by going through every workunit in this job +# # Retrieve all results by going through every workunit in this job - foreach my $workunit (@$workunits) { - my $results = soapvalidate($server->getResults($auth, { - workunit_gid_match => [$$workunit{workunit_gid}], # filter by workunit guid - success_active => 1, success_value => 1 # only retrieve successful results - }, - "", 0, 1)); # no ordering, start at record 0, give me 1 result - # if you want to actually compare redundant results to see if there is a quorum - # here would be a good place to retrieve all results and 'diff' them - # for now, I just retrieve 1 results through the queue and go with that - if (not exists $$results{records}) { - 'debug' -> die( message => 'Found a Workunit without any successful Results, aborting retrieval.' ); +# foreach my $workunit (@$workunits) { +# my $results = soapvalidate($server->getResults($auth, { +# workunit_gid_match => [$$workunit{workunit_gid}], # filter by workunit guid +# success_active => 1, success_value => 1 # only retrieve successful results +# }, +# "", 0, 1)); # no ordering, start at record 0, give me 1 result +# # if you want to actually compare redundant results to see if there is a quorum +# # here would be a good place to retrieve all results and 'diff' them +# # for now, I just retrieve 1 results through the queue and go with that +# if (not exists $$results{records}) { +# 'debug' -> die( message => 'Found a Workunit without any successful Results, aborting retrieval.' ); - } - my $result = $$results{records}[0]; - my $tempfile = "package.tar"; #mktemp("tmpresXXXXXX"); - # open(PACKAGE, ">package.tar"); - my $resulturl = "$mgsifilesvr?auth=$auth&hash=$$result{result_hash}"; - my $request = HTTP::Request->new('GET', $resulturl); - my $response = $ua->request($request, $tempfile); - if ($response->is_error() ) { - 'debug' -> die( message => "Couldn't retrieve result file, server returned ".$response->status_line ); - } elsif ($response->header('Content-Length') != -s $tempfile) { - 'debug' -> die( message => "Incomplete file returned from server (expected ".$response->header('Content-Length')." but received ".(-s $tempfile).")." ); - } +# } +# my $result = $$results{records}[0]; +# my $tempfile = "package.tar"; #mktemp("tmpresXXXXXX"); +# # open(PACKAGE, ">package.tar"); +# my $resulturl = "$mgsifilesvr?auth=$auth&hash=$$result{result_hash}"; +# my $request = HTTP::Request->new('GET', $resulturl); +# my $response = $ua->request($request, $tempfile); +# if ($response->is_error() ) { +# 'debug' -> die( message => "Couldn't retrieve result file, server returned ".$response->status_line ); +# } elsif ($response->header('Content-Length') != -s $tempfile) { +# 'debug' -> die( message => "Incomplete file returned from server (expected ".$response->header('Content-Length')." but received ".(-s $tempfile).")." ); +# } - require Archive::Tar; +# require Archive::Tar; - my $tar = Archive::Tar->new; +# my $tar = Archive::Tar->new; - $tar->read('package.tar',0, {extract => 1}); +# $tar->read('package.tar',0, {extract => 1}); - if( $Config{osname} ne 'MSWin32' ){ - cp('psn.LST','psn.lst'); - unlink('psn.LST'); - } +# if( $Config{osname} ne 'MSWin32' ){ +# cp('psn.LST','psn.lst'); +# unlink('psn.LST'); +# } - # add data to total frequency list - } +# # add data to total frequency list +# } - # Optional deletion of job - #if ($deletejob eq '-d') { - #print "now deleting job $jobId..."; - #soapvalidate($server->deleteJob($auth, $$job{job_gid})); - #} +# # Optional deletion of job +# #if ($deletejob eq '-d') { +# #print "now deleting job $jobId..."; +# #soapvalidate($server->deleteJob($auth, $$job{job_gid})); +# #} - # helper subroutines +# # helper subroutines - # read in configuration - sub uduserconf_read_file { - my @files = @_; +# # read in configuration +# sub uduserconf_read_file { +# my @files = @_; - my $file = undef; - foreach (@files) { - if (-f($_)) { - $file = $_; - last; - } - } - defined($file) or 'debug' -> die(message => "Could not find any of: " . join(' ', @files) ); - open(FH,$file) or 'debug' -> die(message => "Could not open $file: $!"); +# my $file = undef; +# foreach (@files) { +# if (-f($_)) { +# $file = $_; +# last; +# } +# } +# defined($file) or 'debug' -> die(message => "Could not find any of: " . join(' ', @files) ); +# open(FH,$file) or 'debug' -> die(message => "Could not open $file: $!"); - # Main parsing loop for the file's contents. - while () { - if (/^\s*(\w+)\s*=\s*\"([^\"]*)\"/ or /^\s*(\w+)\s*=\s*(\S+)\s*/) { - $confparams{uc($1)} = $2; - } - } +# # Main parsing loop for the file's contents. +# while () { +# if (/^\s*(\w+)\s*=\s*\"([^\"]*)\"/ or /^\s*(\w+)\s*=\s*(\S+)\s*/) { +# $confparams{uc($1)} = $2; +# } +# } - close(FH); +# close(FH); - foreach ("MGSI_FILESVR_URL", - "MGSI_SOAP_URL", - "MGSI_USERNAME", - "MGSI_PASSWORD") - { - if (!defined($confparams{$_})) { - 'debug' -> die (message => "$_ must be defined in $file" ); - } - } - } +# foreach ("MGSI_FILESVR_URL", +# "MGSI_SOAP_URL", +# "MGSI_USERNAME", +# "MGSI_PASSWORD") +# { +# if (!defined($confparams{$_})) { +# 'debug' -> die (message => "$_ must be defined in $file" ); +# } +# } +# } - # soap response validation routine - sub soapvalidate { - my ($soapresponse) = @_; - if ($soapresponse->fault) { - 'debug' -> die(message => "fault: ", $soapresponse->faultcode, " ", $soapresponse->faultstring ); - } else { - return $soapresponse->result; - } - } - } +# # soap response validation routine +# sub soapvalidate { +# my ($soapresponse) = @_; +# if ($soapresponse->fault) { +# 'debug' -> die(message => "fault: ", $soapresponse->faultcode, " ", $soapresponse->faultstring ); +# } else { +# return $soapresponse->result; +# } +# } +# } + +# }}} + } -end ud_monitor_retrieve +end ud_monitor # }}} # {{{ ud_retrieve start ud_retrieve { - 1; + my $script; + unless( defined $PsN::config -> {'_'} -> {'ud_nonmem'} ){ + if( $Config{osname} eq 'MSWin32' ) { + $script = 'nonmem.bat'; + } else { + $script = 'nonmem.sh'; + } + } else { + $script = $PsN::config -> {'_'} -> {'ud_nonmem'}; + } + + my $subDir = "NM_run".($run_no+1); + my ($tmp_dir, $file) = OSspecific::absolute_path( $self -> {'directory'}.'/' . + $subDir, ''); + if( system("$script -b -c -d ".$tmp_dir." -r $jobId > nonmem_bat_stdout") ){ + 'debug' -> die( message => "UD submit script failed.\nSystem error message:$!" ); + } + + if( $Config{osname} ne 'MSWin32' ){ + cp($tmp_dir.'/psn.LST',$tmp_dir.'/psn.lst'); + unlink($tmp_dir.'/psn.LST'); + } } end ud_retrieve # }}} +# {{{ sge_submit + +start sge_submit +{ + my $fsubs = join( ',' , @{$model -> subroutine_files} ); + + if( system( 'qsub -cwd -b y ' . + ($PsN::config -> {'_'} -> {'remote_perl'} ? ' ' . $PsN::config -> {'_'} -> {'remote_perl'} : ' perl ') . " -I" . + $PsN::lib_dir ."/../ " . + $PsN::lib_dir . "/nonmem.pm" . + " psn.mod psn.lst " . + $self -> {'nice'} . " ". + $nm_version . " " . + 1 . " " . # compilation + 1 . " " . # execution + $fsubs . " " . + $self -> {'nm_directory'} . ' > JobId' ) ){ + 'debug' -> die( message => "Grid submit failed.\nSystem error message: $!" ); + } + + open(JOBFILE, "JobId") or 'debug' -> die( message => "Couldn't open grid JobId file for reading: $!" ); + while( ){ + if( /Your job (\d+)/ ){ + $jobId = $1; + } + } + close(JOBFILE); +} +end sge_submit + +# }}} + +# {{{ sge_monitor + +start sge_monitor +{ + my $stdout; + if( system( "qstat -j $jobId > JobStat" ) ){ + 'debug' -> die( message => "Grid submit failed.\nSystem error message: $!" ); + } + + open(JOBFILE, "JobStat") or 'debug' -> die( message => "Couldn't open grid JobStat file for reading: $!" ); + while( ){ + if( /Following jobs do not exist:/ ){ # regexp to find finished jobs. + close(JOBFILE); + return $jobId; # Return the jobId found. + } + } + close(JOBFILE); + + return 0; +} +end sge_monitor + +# }}} + # {{{ run_nonmem start run_nonmem @@ -1529,11 +1636,10 @@ start run_nonmem } $candidate_model->_write(); my $nonmem; - + # We do not expect any values of rerun lower than 1 here. (a bug otherwise...) if( not -e 'psn-' . ( $tries + 1 ) . '.lst' or $self -> {'rerun'} >= 2 ){ if( $queue_info -> {'modelfile_tainted'} ){ # We need to recompile - # -------------- Notes about local vs remote compilation/execution ----------------- # Here we check wheter compilation and/or exection will be done @@ -1541,7 +1647,7 @@ start run_nonmem # so far). So, unless we are doing everything remotely, we will # need a "nonmem" object. Which is created here. - if( ( $self -> run_on_lsf() or $self -> run_on_ud() or $self -> run_on_umbrella() ) and + if( ( not $self -> {'run_local'} ) and ( $self -> {'no_remote_compile'} or $self -> {'no_remote_execution'} ) ){ @@ -1562,17 +1668,19 @@ start run_nonmem my $fsubs = join( ',' , @{$model -> subroutine_files} ); my ($stdin,$stdout,$stderr); - my $pid = open3($stdin,$stdout,$stderr, - ($PsN::config -> {'_'} -> {'remote_perl'} ? ' ' . $PsN::config -> {'_'} -> {'remote_perl'} : ' perl ') . " -I" . - $PsN::lib_dir ."/../ " . - $PsN::lib_dir . "/nonmem.pm" . - " psn.mod psn.lst " . - $self -> {'nice'} . " ". - $nm_version . " " . - 1 . " " . # request compilation - 0 . " " . # no execution - $fsubs . " " . - $self -> {'nm_directory'} ); + my $pid = fork(); + if( $pid == 0 ){ + exec( ($PsN::config -> {'_'} -> {'remote_perl'} ? ' ' . $PsN::config -> {'_'} -> {'remote_perl'} : ' perl ') . " -I" . + $PsN::lib_dir ."/../ " . + $PsN::lib_dir . "/nonmem.pm" . + " psn.mod psn.lst " . + $self -> {'nice'} . " ". + $nm_version . " " . + 1 . " " . # compilation + 0 . " " . # no execution + $fsubs . " " . + $self -> {'nm_directory'} ); + } $queue_info->{'compile_only'} = 1; $queue_map->{$pid} = $run_no; @@ -1620,11 +1728,10 @@ start run_nonmem if( ( $self -> {'run_local'} and not $compile_only ) or ( not $self -> {'run_local'} and $self -> {'no_remote_execution'} ) ){ - + # Normal local execution open( NMMSG , "; my $compilation_message = join( '',@tmp ); @@ -1638,21 +1745,21 @@ start run_nonmem } my $fsubs = join( ',' , @{$model -> subroutine_files} ); my ($stdin,$stdout,$stderr); - my $pid = open3($stdin,$stdout,$stderr, - ($PsN::config -> {'_'} -> {'remote_perl'} ? ' ' . $PsN::config -> {'_'} -> {'remote_perl'} : ' perl ') . " -I" . - $PsN::lib_dir ."/../ " . - $PsN::lib_dir . "/nonmem.pm" . - " psn.mod psn.lst " . - $self -> {'nice'} . " ". - $nm_version . " " . - 0 . " " . # request compilation - 1 . " " . # no execution - $fsubs . " " . - $self -> {'nm_directory'} ); + my $pid = fork(); + if( $pid == 0 ){ + exec( ($PsN::config -> {'_'} -> {'remote_perl'} ? ' ' . $PsN::config -> {'_'} -> {'remote_perl'} : ' perl ') . " -I" . + $PsN::lib_dir ."/../ " . + $PsN::lib_dir . "/nonmem.pm" . + " psn.mod psn.lst " . + $self -> {'nice'} . " ". + $nm_version . " " . + 0 . " " . # no compilation + 1 . " " . # execution + $fsubs . " " . + $self -> {'nm_directory'} ); + } $queue_info->{'compile_only'} = 0; - $queue_info->{'candidate_model'} = $candidate_model; - $queue_map->{$pid} = $run_no; } elsif( $self -> {'run_on_lsf'} ) { @@ -1675,13 +1782,27 @@ start run_nonmem model => $candidate_model, nm_version => $nm_version, prepare_jobs => 1); - } elsif ( $self -> run_on_ud() and $self -> no_remote_execution() ) { - + } elsif ( $self -> run_on_ud() and not $self -> no_remote_execution() ) { my $jobId = $self -> ud_submit( model => $candidate_model ); - $self -> ud_monitor_retrieve( jobId => $jobId ); - #### OBS! Måste skrivas om så att vi inte väntar här (i ud_monitor_retrieve) + $queue_info->{'compile_only'} = 0; + $queue_map->{$jobId} = $run_no; + + } elsif ( $self -> run_on_sge() and not $self -> no_remote_execution() ){ + my $jobId = $self -> sge_submit( model => $candidate_model, + nm_version => $nm_version ); + + $queue_info->{'compile_only'} = 0; + $queue_map->{$jobId} = $run_no; } + } elsif( $self -> {'rerun'} >= 1 ){ + + # We are not forcing a rerun, but we want to recheck the + # output files for errors. Therefore we put a fake entry in + # queue_map to trigger "restart_nonmem()". + + $queue_info->{'compile_only'} = 0; + $queue_map->{'rerun_'.$run_no} = $run_no; #Fake pid } # end of "not -e psn-$tries.lst or rerun" } end run_nonmem @@ -1720,10 +1841,16 @@ start restart_needed # We need the trail of files to select the most appropriate at the end # (see copy_model_and_output) - my $tries = \$queue_info{'tries'}; - my $model = $queue_info{'model'}; - my $candidate_model = $queue_info{'candidate_model'}; - my $modelfile_tainted = \$queue_info{'modelfile_tainted'}; + unless( defined $parm{'queue_info'} ){ + # The queue_info must be defined here! + 'debug' -> die( message => "Internal run queue corrupt\n" ); + } + my $queue_info_ref = $parm{'queue_info'}; + my $run_results = $queue_info_ref -> {'run_results'}; + my $tries = \$queue_info_ref -> {'tries'}; + my $model = $queue_info_ref -> {'model'}; + my $candidate_model = $queue_info_ref -> {'candidate_model'}; + my $modelfile_tainted = \$queue_info_ref -> {'modelfile_tainted'}; my $lstsuccess = 0; for( my $lsttry = 1; $lsttry <= 5; $lsttry++ ){ @@ -1741,6 +1868,11 @@ start restart_needed } } } + if( defined $model -> extra_output() ){ + foreach my $file( @{$model -> extra_output()} ){ + cp( $file, $file.'-'.(${$tries}+1) ); + } + } } else { # This is rerun==1, i.e. re-evaluate the stuff that has been @@ -1782,7 +1914,7 @@ start restart_needed return(0); } if( not $output_file -> parsed_successfully() and - $queue_info{'crashes'} <= $self -> restart_crashes() ) { + $queue_info_ref -> {'crashes'} <= $self -> restart_crashes() ) { # If the output file could not be parsed successfully, this is # a sign of a crashed run. This is not a NONMEM error as such @@ -1794,7 +1926,7 @@ start restart_needed message => "Restarting crashed run ". $output_file -> full_name(). "\n".$output_file -> parsing_error_message() ); - $queue_info{'crashes'}++; + $queue_info_ref -> {'crashes'}++; cp( 'psn-'.($tries+1).'.mod', 'psn.mod' ); unlink( 'psn-'.($tries+1).'.lst' ); return(1); # Return a one (1) to make run() rerun the @@ -1806,7 +1938,7 @@ start restart_needed # If the output file was parsed successfully, we (re)set the $crashes # variable and continue - $queue_info{'crashes'} = 0; + $queue_info_ref -> {'crashes'} = 0; $minimization_successful = $output_file -> minimization_successful(); $minimization_message = $output_file -> minimization_message(); @@ -1828,7 +1960,7 @@ start restart_needed $run_results -> [${$tries}] -> {'pass_picky'} = 0; # }}} - + # }}} # {{{ Check if maxevals is reached and copy msfo to msfi @@ -1990,7 +2122,6 @@ start restart_needed # bottom. if( not $marked_for_rerun and ${$tries} < $self -> {'min_retries'} ) { - #Here we force pertubation when the model is successful. ${$tries} ++; @@ -2020,6 +2151,7 @@ start restart_needed # }}} $lstsuccess = 1; # We did find the lst file. + last; } else { sleep(($lsttry+1)**2); print "The lst-file is not present, trying ",(5-$lsttry)," times more\n"; @@ -2042,21 +2174,26 @@ start select_best_model # Since we have reruns with pertubation and now also forced (or # automatic) pertubation the final model is not equal to the - # original model. If we consider the models run in the algoritm - # above, there are four implicit subsets. Those that pass the picky - # test, those that don't pass the picky test but have minimization - # successful, those that don't pass the minimization step but - # produce ofv values and, finaly, those that doesn't produce an ofv - # value. The final model will be the model that passes the most - # tests and have the lowest ofv value, and if no ofv value is - # produced, it will be the basic model. + # original model. We consider four implicit subsets. Those that pass + # the picky test, those that don't pass the picky test but have + # minimization successful, those that don't pass the minimization + # step but produce an ofv and, finaly, those that doesn't produce an + # ofv. The final model will be the model that passes the most tests + # and have the lowest ofv value, and if no ofv value is produced, it + # will be the basic model. # Get all runs that passed the picky test (if picky is used) # The order of categories is important. Highest priority goes last. - my $model = $queue_info{'model'}; - my $candidate_model = $queue_info{'candidate_model'}; + unless( defined $parm{'queue_info'} ){ + # The queue_info must be defined here! + 'debug' -> die( message => "Internal run queue corrupt\n" ); + } + my $queue_info_ref = $parm{'queue_info'}; + my $run_results = $queue_info_ref -> {'run_results'}; + my $model = $queue_info_ref -> {'model'}; + my $candidate_model = $queue_info_ref -> {'candidate_model'}; my @selection_categories = ('really_bad','terminated','normal','picky'); my ( %selection, $selected ); @@ -2406,14 +2543,9 @@ start run if( $self -> {'adaptive'} and $self -> {'threads'} > 1 ) { - # Make a moshog request. Answer will be read once here and - # then in run_on_finish - + # Initiate the moshog client $moshog_client = moshog_client -> new(start_threads => $self -> {'threads'}); - $moshog_client -> request( request => scalar @models - $self -> {'threads'} ); - - $threads = $self -> {'threads'} + $moshog_client -> granted(); - + } # }}} @@ -2438,33 +2570,61 @@ start run my @queue = (0..$#models); my $all_jobs_started = 0; - # We loop while there is content in the queue (which can grow) - # and while we have jobs running (represented in the queue_info - + # We loop while there is content in the queue (which shrinks and grows) + # and while we have jobs running (represented in the queue_info) + my $qno = 0; while( @queue or (scalar keys %queue_map > 0) ){ - - # We start by looking for jobs that have been started and have + $qno++; + # We start by looking for jobs that have been started and # finished. If we find one, we set $pid to that job. - + my $pid = 0; - + foreach my $check_pid( keys %queue_map ){ + + if( $check_pid =~ /^rerun_/ ){ + + # A pid that starts with "rerun_" is a rerun and is always + # "finished". + + $pid = $check_pid; + last; + } + + # Diffrent environments have diffrent ways of reporting + # job status. Here we check which environment we are in + # and act accordingly. + + if( $self -> {'run_on_ud'} ){ + + $pid = $self -> ud_monitor( jobId => $check_pid ); + + if( $pid ){ + $self -> ud_retrieve( jobId => $check_pid, + run_no => $queue_map{$check_pid} ); + } + + } elsif( $self -> {'run_on_sge'} ) { - $pid = waitpid($check_pid,WNOHANG); - if( $pid == -1 ){ - - my $run = $queue_map{$check_pid}; + $pid = $self -> sge_monitor( jobId => $check_pid ); - $queue_info{$run}{'nr_wait'}++; - if( $queue_info{$run}{'nr_wait'} > 10 ){ - debug -> die(message => "Nonmem run was lost\n"); + } else { # Local process poll + $pid = waitpid($check_pid,WNOHANG); + if( $pid == -1 ){ + + my $run = $queue_map{$check_pid}; + + $queue_info{$run}{'nr_wait'}++; + if( $queue_info{$run}{'nr_wait'} > 10 ){ + debug -> die(message => "Nonmem run was lost\n"); + } + $pid = $check_pid; } - $pid = $check_pid; } + last if $pid; - + } - # Check if no job has finished if( !$pid ){ # Then if queue is empty or we have $threads number of jobs @@ -2473,8 +2633,31 @@ start run # In that case we should not start another job. (Also # sleep to make polling less demanding). + + if( defined $PsN::config -> {'_'} -> {'job_polling_interval'} and + $PsN::config -> {'_'} -> {'job_polling_interval'} > 0 ) { + sleep($PsN::config -> {'_'} -> {'job_polling_interval'}); + } else { + sleep(1); + } + my $make_request = 0; + if ( defined $PsN::config -> {'_'} -> {'polls_per_moshog_request'} and + $self -> {'adaptive'} and + $self -> {'threads'} > 1 and + not ( $qno % $PsN::config -> {'_'} -> {'polls_per_moshog_request'} ) ) { + $make_request = 1; + } + + if( $make_request and + scalar @queue > 0 ) { + + # Make a moshog request. + + $moshog_client -> request( request => scalar @queue ); + $threads += $moshog_client -> granted(); + + } - sleep(1); next; } } @@ -2538,7 +2721,7 @@ start run push( @{$self -> {'ntries'}}, 0 ); # push( @{$self -> {'evals'}}, \@evals ); - next; # We are done with this model. It has allready been run. + next; # We are done with this model. It has already been run. } @@ -2604,6 +2787,7 @@ start run $queue_info{$run}{'tries'} = 0; $queue_info{$run}{'crashes'} = 0; $queue_info{$run}{'compile_only'} = 1; + $queue_info{$run}{'run_results'} = []; # {{{ printing progress @@ -2653,8 +2837,6 @@ start run my %options_hash = %{$self -> _get_run_options(run_id => $run)}; - my @run_results; - my $work_dir = 'NM_run' . ($run+1) ; chdir( $work_dir ); @@ -2668,18 +2850,18 @@ start run delete( $queue_map{$pid} ); chdir( '..' ); } else { + $self -> compute_cwres( queue_info => $queue_info{$run}, + run_no => $run ); my $do_restart = $self -> restart_needed( %options_hash, queue_info => $queue_info{$run}, - run_no => $run, - run_results => \@run_results ); - + run_no => $run ); if( $do_restart ){ unshift(@queue, $run); delete($queue_map{$pid}); + chdir( '..' ); } else { $self -> select_best_model(run_no => $run, nm_version => $options_hash{'nm_version'}, - run_results => \@run_results, queue_info => $queue_info{$run}); # {{{ Print finishing messages @@ -2782,13 +2964,10 @@ start run foreach my $row ( @{$select_arr} ){ if( exists $queue_info{$row -> [0]} ){ - print "\n Getting Job: \t",$row -> [0], "\n"; - my $work_dir = $row -> [0]; my $id = $work_dir; chdir( $work_dir ); my $modelfile_tainted = 0; - my @run_results; unless( defined $queue_info{$id}{'tries'} ){ $queue_info{$id}{'tries'} = 0; @@ -2799,8 +2978,7 @@ start run if( $self -> restart_needed( %options_hash, queue_info => $queue_info{$id}, #evals => \@evals, - run_no => $id, - run_results => \@run_results ) ){ + run_no => $id ) ){ my $new_job_id; @@ -2815,12 +2993,10 @@ start run %{$queue_info{$new_job_id}} = %{$queue_info{$id}}; delete $queue_info{$id}; - print "rerunning ", $queue_info{$id}{'run_no'}, "\n"; } else { $self -> select_best_model(run_no => $queue_info{$id}{'run_no'}, nm_version => $options_hash{'nm_version'}, - run_results => \@run_results, queue_info => $queue_info{$id}); delete $queue_info{$id}; @@ -3286,3 +3462,30 @@ start summarize end summarize # }}} + +# {{{ compute_cwres + +start compute_cwres +my $queue_info_ref = defined $parm{'queue_info'} ? $parm{'queue_info'} : {}; +my $model = $queue_info_ref -> {'model'}; +if( defined $PsN::config -> {'_'} -> {'R'} ) { + my $probs = $model -> problems(); + if( defined $probs ) { + foreach my $prob ( @{$probs} ) { + if( $prob -> {'cwres_modules'} ){ + my $sdno = $prob -> {'cwres_modules'} -> [0] -> sdno(); + +# Create the short R-script to compute the CWRES. + open( RSCRIPT, ">compute_cwres.R" ); + print RSCRIPT "library(xpose4)\ncompute.cwres(\"cwtab$sdno\")\n"; + close( RSCRIPT ); + +# Execute the script + system( $PsN::config -> {'_'} -> {'R'}." CMD BATCH compute_cwres.R" ); + } + } + } +} +end compute_cwres + +# }}} -- 2.11.4.GIT