1 #!/usr/bin/perl -w
2 #use threads;
3 use strict;
4 use warnings;
5 use DBI;
6 use Time::HiRes qw ( gettimeofday tv_interval );
7 use Galaxy::ShowHelp;
9 ######
10 =pod
11 To do:
12 -m cache lines into $hash{"$fileid_$offset"} so that just need to read 1 times.
14 If sync read over NFS will benifit from NFS cache ...
15 -t time to wait for sync read.
16 1. `date > $opt_o/.Signal/$opt_c.ready`
17 2. wait -t seconds for $opt_o/.Signal/$opt_c.GO
18 3. if -e $opt_o/.Signal/$opt_c.GO {begin reading} else {goto 2}
19 4. after -t seconds, `touch $opt_o/.Signal/$opt_c.GO` & begin reading
20 5. when reading begin, `date >> $opt_o/.Signal/$opt_c.ready`
21 In this way, $opt_o/.Signal/$opt_c.GO can be touch manually, or just wait for -t.
22 -f chrorder file way to wait for sync. Just wait all $opt_c.ready and then GO.
23 -p -f use UDP port -p to communicate(pingpong within this perl) instead of files over NFS.
25 Changelog:
26 0.3.4 only -b -o will trim
27 0.3.5 $sth4 SELECTs strand to ensure 2 lines out
28 0.3.6 filter out by col.10 in PE for soapSNP mis-read with soap2 -g
29 0.3.7 add cache option
31 =cut
32 ######
34 $main::VERSION=0.3.7;
36 our $opts='i:o:c:bvmf';
37 our($opt_i, $opt_o, $opt_c, $opt_v, $opt_b, $opt_m, $opt_f);
39 our $desc='SoapSort library PCR PE Duplication Remover & Merger (Atom Edition)';
40 our $help=<<EOH;
41 \t-i SOAP result list, in format: /^[PS]E\\t/path/to/soap.file\\n\$/
42 \t-c Chromosome name, the single one to parse
43 \t-o Merged output file prefix for a sample, directories must exist
44 \t Output file(with path) will be [{-o}.{-c}]
45 \t-m cache soap file so that no more file reading at output
46 \t-f Filter out reads with gaps on [soap2 -g]
47 \t-v show verbose info to STDOUT
48 \t-b No pause for batch runs
49 EOH
51 ShowHelp();
53 warn "[x]Must specify -i xx.soaplist !\n" unless $opt_i;
54 warn "[x]Must specify -c 'Chromosome Name' !\n" unless $opt_c;
55 warn "[x]Must specify -o ./xx !\n" unless $opt_o;
56 exit 1 unless $opt_i and $opt_o and $opt_c;
58 my $outfile=${opt_o}.'.'.$opt_c;
59 print STDERR "From [$opt_i] to [$outfile]\n";
60 print STDERR "-f on\t" if $opt_f;
61 print STDERR "-m on\t" if $opt_m;
62 print STDERR "\n";
63 #if ($opt_c) {print "Only parse Chromosome [$opt_c].\n";}
64 # else {print "Parsing All Chromosomes.\n";}
65 unless ($opt_b) {print STDERR 'press [Enter] to continue...'; <>;}
67 my $start_time = [gettimeofday];
69 unless (-s $opt_i) {die "[x]Soaplist [$opt_i] is nothing !\n";}
70 my (%FILES);
71 open SAMPLE,'<',$opt_i or die "Error opening $opt_i: $!\n";
72 while (<SAMPLE>) {
73 chomp;
74 my ($pe,$file)=split /\t/;
75 push @{$FILES{$pe}},$file;
77 close SAMPLE;
78 #if ($opt_v) {}
79 #system('mkdir','-p',$opt_o);
80 open OUT,'>',$outfile or die "Cannot create $outfile: $!\n"; # better to check before
82 ### Begin SQL
83 my %attr = (
84 RaiseError => 0,
85 PrintError => 1,
86 AutoCommit => 0 );
87 my $dbh = DBI->connect('dbi:SQLite:dbname=:memory:','','',\%attr) or die $DBI::errstr;
89 my $sql=q/
91 ( soapid TEXT,
92 pos INTEGER,
93 fileid INTEGER,
94 offset INTEGER );
96 ( soapid TEXT,
97 len INTEGER,
98 hit INTEGER,
99 sumQasc INTEGER,
100 pos INTEGER,
101 strand TEXT,
102 realpos INTEGER,
103 fileid INTEGER,
104 offset INTEGER,
105 isTrim INTEGER,
106 chosen INTEGER );
108 # The rowid value can be accessed using one of the special names "ROWID", "OID", or "_ROWID_".
109 for (split /;/,$sql) {
110 next if /^\s*$/;
111 $dbh->do($_) or die $dbh->errstr;
113 $dbh->commit;
115 sub doindex() {
116 my $sql=q/
117 CREATE INDEX IF NOT EXISTS rps ON soap(realpos);
118 CREATE INDEX IF NOT EXISTS sf ON soap(soapid,fileid);
120 for (split /;/,$sql) {
121 next if /^\s*$/;
122 $dbh->do($_) or warn $dbh->errstr;
124 $dbh->commit;
127 my $sthpe = $dbh->prepare( 'INSERT INTO soap ( soapid,pos,fileid,offset,len,hit,sumQasc,strand,realpos,isTrim ) VALUES ( ?,?,?,?,?,?,?,?,?,? )' );
128 my $sthse = $dbh->prepare( 'INSERT INTO se ( soapid,pos,fileid,offset ) VALUES ( ?,?,?,? )' );
129 my $sth0 = $dbh->prepare( 'SELECT DISTINCT soapid,fileid FROM soap;' );
130 my $sth1 = $dbh->prepare( 'SELECT DISTINCT realpos,strand FROM soap WHERE soapid=? AND fileid=? ORDER BY strand ASC;' ); # '+', which comes first, < '-'
131 my $sth2 = $dbh->prepare( 'SELECT DISTINCT soapid,fileid FROM soap WHERE realpos=? AND strand=?;' );
132 my $sth3 = $dbh->prepare( 'UPDATE soap SET chosen=1 WHERE soapid=? AND fileid=?;' );
133 my $sth4 = $dbh->prepare( 'SELECT DISTINCT len,hit,sumQasc,strand FROM soap WHERE soapid=? AND fileid=? ORDER BY strand ASC;' ); # strand must be here as the other maybe same.
134 my $sth5 = $dbh->prepare( 'SELECT DISTINCT soapid,fileid,offset,\'PE\',isTrim,pos FROM soap WHERE chosen=1 UNION
135 SELECT DISTINCT soapid,fileid,offset,\'SE\',0,pos FROM se
136 ORDER BY pos ASC;' ); # order to make a sorted out
138 #0|0|TABLE soap
139 #0|0|TABLE se
140 # cannot use index on single table
142 #my $sth5 = $dbh->prepare( 'SELECT DISTINCT soapid,fileid,offset,pos FROM soap WHERE chosen=1 ORDER BY pos ASC;' ); # order to make a sorted out
143 #my $sth6 = $dbh->prepare( 'SELECT DISTINCT soapid,fileid,offset,pos FROM se ORDER BY pos ASC;' );
144 # no need to CREATE INDEX IF NOT EXISTS c ON soap(chosen) since not so many dups.
146 $|=1;
147 #if ($opt_m) {
148 my ($o_f,$o_fs,%PSE)=(0,0);
150 my ($fileid,%FH,$Offset,$the_time,$isTrim)=(0);
151 my ($allDroped,$maxDroped,$itemsInP,$itemsOutP0,$itemsOutP,$itemsOutS,$itemsInS,$PEtrimed,$SEtrimed)=(0,0,0,0,0,0,0,0,0); # $itemsIn = $allDroped + $itemsOut
152 my ($bpsIP,$bpsOP,$bpsIS,$bpsOS)=(0,0,0,0);
153 my $count;
154 for my $file (@{$FILES{PE}}) {
155 ++$fileid; # count from 1
156 $the_time = [gettimeofday];
157 printf STDERR ">Parsing\033[32;1m PE %3u @ %11.6f\033[0;0m sec.\n[%s] ",$fileid,tv_interval($start_time, $the_time),$file;
158 open $FH{$fileid},'<',$file or die "Error opening PE [$file]: $!\n";
159 $Offset=tell $FH{$fileid};
160 #print STDERR 'p',$fileid;
161 my ($sumQasc,$soapid,$Qstr,$hit,$len,$strand,$chr,$pos,$realpos,@Q,$trim,$trimed,$types);
162 $count=0;
163 while ($_=readline $FH{$fileid}) {
164 # soapSNP 2.20 cannot run with -g, so filter out by col. 10
165 ($soapid,$Qstr,$hit,$len,$strand,$chr,$pos,$types,$trim) = (split(/\t/))[0,2,3,5,6,7,8,9,-2];
166 if ($chr ne $opt_c) {
167 $Offset=tell $FH{$fileid};
168 next;
170 if ($opt_f and length $types > 2) {
171 print "[v]Skipped:[${fileid}_$soapid]\@Chr:$chr,$pos,$strand\t${types}\n" if $opt_v;
172 $Offset=tell $FH{$fileid};
173 ++$o_f;
174 next;
176 ++$count; ++$itemsInP; $bpsIP += $len;
177 $trimed=$sumQasc=0; # must reset to 0 cyclely !
178 $realpos=$pos;
179 if ($strand eq '-') {
180 $realpos += $len; # should be $len-1. So, starting 0. (+ & -, never meets.)
181 if ($trim =~ /(\d+)S$/) {
182 $realpos += $trimed = $1; # $1 only reset after next /()/
184 } elsif ($strand eq '+') {
185 if ($trim =~ /^(\d+)S/) {
186 $realpos -= $trimed = $1;
188 } else { # elsif ($strand ne '+')
189 $realpos=0;
190 print STDERR "[!][${fileid}_$soapid] @ Chr:$chr $pos with wrong strand [$strand]\n";
192 @Q = unpack('C*', $Qstr);
193 $sumQasc += $_ for @Q;
194 #@Q = split //,$Qstr;
195 #for (@Q) { $sumQasc += ord; }
196 if ($trimed > 0) {
197 ++$PEtrimed;
198 $isTrim=1;
199 print "[v]Trimed:[${fileid}_$soapid]\@Chr:$chr,$pos,$strand\t${trim}->\tΔ:$trimed,l:$len,$realpos:",$pos-$realpos,"\tQ:$sumQasc\t$PEtrimed\n" if $opt_v;
200 } else { $isTrim=0; }
201 if ($opt_m) {
202 $PSE{$fileid}{$Offset}=$_;
204 $sthpe->execute($soapid,$pos,$fileid,$Offset,$len,$hit,$sumQasc,$strand,$realpos,$isTrim);
205 $Offset=tell $FH{$fileid};
207 print STDERR $count,"\n";
208 #close $FH{$fileid};
209 $dbh->commit;
211 for my $file (@{$FILES{SE}}) {
212 ++$fileid; # will be the No. of files
213 $the_time = [gettimeofday];
214 printf STDERR ">Parsing\033[32;1m SE %3u @ %11.6f\033[0;0m sec.\n[%s] ",$fileid,tv_interval($start_time, $the_time),$file;
215 open $FH{$fileid},'<',$file or die "Error opening SE [$file]: $!\n";
216 $Offset=tell $FH{$fileid};
217 #print STDERR 's',$fileid;
218 my ($soapid,$len,$chr,$pos);
219 $count=0;
220 while ($_=readline $FH{$fileid}) {
221 ($soapid,$len,$chr,$pos) = (split(/\t/))[0,5,7,8];
222 if ($chr ne $opt_c) {
223 $Offset=tell $FH{$fileid};
224 next;
226 # if ($trim =~ /\dS/) { $isTrim=1; ++$SEtrimed; }
227 # else { $isTrim=0; }
228 ++$count; ++$itemsInS; $bpsIS += $len;
229 if ($opt_m) {
230 $PSE{$fileid}{$Offset}=$_;
232 $sthse->execute($soapid,$pos,$fileid,$Offset);
233 $Offset=tell $FH{$fileid};
235 print STDERR $count,"\n";
236 $dbh->commit;
239 $the_time = [gettimeofday];
240 printf STDERR "-Finish Loading\033[32;1m @ %11.6f\033[0;0m sec.\n",tv_interval($start_time, $the_time);
241 print STDERR 'L';
242 ##FINISH Loading soaps
244 print STDERR "i\b";
245 &doindex; print STDERR 'I';
247 print STDERR "p\b";
248 $sth0->execute; print STDERR 'P';
250 print STDERR "d\b";
251 my ($soapid,%mergeids,$aa,$bb,$rres,$mergeid,$qres); # "my" variable $fileid masks earlier declaration
252 while ($rres = $sth0->fetchrow_arrayref) {
253 ($soapid,$fileid)=@$rres; # soapid,fileid
254 $mergeid=$fileid.'_'.$soapid;
255 ++$mergeids{$mergeid}; # init mark
256 next if $mergeids{$mergeid} > 1; # check mark
257 $sth1->execute($soapid,$fileid); # realpos,strand '+'<'-'
258 $qres = $sth1->fetchall_arrayref;
259 if ($#$qres == 1) { # Normal PE
260 ($aa,$bb)=@$qres; # just mid. value of [realpos,strand]; aa->'+', bb->'-'
261 $sth2->execute($$aa[0],$$aa[1]);
262 $aa=$sth2->fetchall_arrayref; # soapid,fileid
263 $sth2->execute($$bb[0],$$bb[1]);
264 $bb=$sth2->fetchall_arrayref;
265 } elsif ($#$qres > 1) { # more than PE, only if file with -t was merged, next
266 print "[!]Reads:[",1+$#$qres,"], not a PE soap for [$mergeid] ! Ignored.(Maybe soap file with -t was merged with cat or cp)\n";
267 next;
268 } elsif ($#$qres == 0) { # SE, impossable, next
269 if ($opt_f) {
270 print "[!]Reads:[1], is a SE soap for [$mergeid] at strand:$$qres[0][1] ! Ignored. (This is OK since -f used.)\n";
271 ++$o_fs;
272 } else {
273 print "[!]Reads:[1], is a SE soap for [$mergeid] at strand:$$qres[0][1] ! Ignored.(Please use given program to generate input list file, ONLY those soap -b -o are PE.)\n"; # Well, we can INSERT INTO se. But, is robust so important when I am tired ?
275 next;
276 } else { die "[x]System Error. (Memory or DBI)"; } # -1, no such soapid
278 my (%c,@dup,$dmergeid); #=();
279 for (@$aa,@$bb) {
280 $dmergeid=$$_[1].'_'.$$_[0];
281 next if $dmergeid eq $mergeid;
282 ++$c{$dmergeid};
284 for (keys %c) {
285 if ($c{$_}>1) {
286 push @dup,$_;
287 ++$mergeids{$_}; # make mark
288 print "[!][$_] come with $c{$_} > 2 results.\n" if $c{$_}>2;
291 if ($#dup == -1) {
292 $sth3->execute($soapid,$fileid);
293 ++$itemsOutP0;
294 print "[v]+>[$mergeid]\n" if $opt_v;
295 } else {
296 push @dup,$mergeid;
297 $allDroped += $#dup;
298 $maxDroped = $#dup if $maxDroped < $#dup;
299 # There do are cases that: "DBD::SQLite::db selectall_arrayref failed: Expression tree is too large (maximum depth 1000)"
300 my ($csoapid,$cfileid,%candidates,$dsoapid,$dfileid);
301 for $dmergeid (@dup) {
302 ($dfileid,$dsoapid)=split /_/,$dmergeid;
303 $sth4->execute($dsoapid,$dfileid); # len,hit,sumQasc,strand
304 $qres = $sth4->fetchall_arrayref; # ORDER BY strand ASC
305 if ($#$qres != 1) {
306 print "[!]Reads:[",1+$#$qres,"], not a PE soap for [$dmergeid] ! Ignored.(Maybe soap file with -t was merged with cat or cp)\n" if $#$qres > 1; # since $mergeid has pass the check, always with at least 1 out.
307 if ($#$qres == 0) {
308 print "[!]Reads:[1], is a SE soap for [$dmergeid] at strand:$$qres[0][3] ! Ignored.(2)\n"; # EP ?
309 #print "[!]Candidates are: [$dmergeid] of ";
310 #print "[$_]\t" for (@dup);
311 #print "\n";
313 # no more check for $#$qres == -1, we are smoking. Well, half smoking.
314 next; # also pass $#$qres = -1 or 0
316 $candidates{$dmergeid}=[ @$qres ]; # '+' -> [], '-' -> [len,hit,sumQasc]
318 #### $dmergeid = &score_dups(\%candidates); # Let's in_line ...
319 ###### sub score_dups ($)
320 ### Way1: for speed. Output should be the same as Way0.
321 my (%len,%hit,%sumQasc,$len,$hit,$sumQasc,@t,$outT);
322 for (keys %candidates) {
323 $len = $candidates{$_}->[0]->[0] + $candidates{$_}->[1]->[0];
324 push @{$len{$len}},$_;
326 @t=(sort {$b <=> $a} keys %len); # sort numerically descending
327 if ($#{$len{$t[0]}}==0) {
328 if ($opt_v) {
329 $outT=0;
330 print "[v]->:";
331 for $len (@t) {
332 for (@{$len{$len}}) {
333 printf '[v]drop%3u:',$outT if $outT > 0;
334 print " [$_],len:$len\n";
335 ++$outT;
339 $dmergeid=${$len{$t[0]}}[0];
340 goto RETURN; # return $dmergeid;
343 for ( @{$len{$t[0]}} ) {
344 $hit=$candidates{$_}->[0]->[1];
345 push @{$hit{$hit}},$_;
347 @t=(sort {$a <=> $b} keys %hit); # ASC
348 if ($#{$hit{$t[0]}}==0) {
349 if ($opt_v) {
350 $outT=0;
351 print "[v]->:";
352 for $hit (@t) {
353 for (@{$hit{$hit}}) {
354 printf '[v]drop%3u:',$outT if $outT > 0;
355 print " [$_],hit:$hit\n";
356 ++$outT;
360 $dmergeid=${$hit{$t[0]}}[0];
361 goto RETURN; # return $dmergeid;
364 for ( @{$hit{$t[0]}} ) {
365 $sumQasc = $candidates{$_}->[0]->[2] + $candidates{$_}->[1]->[2];
366 push @{$sumQasc{$sumQasc}},$_;
368 @t=(sort {$b <=> $a} keys %sumQasc); # DESC
369 if ($opt_v) {
370 $outT=0;
371 print "[v]->:";
372 for $sumQasc (@t) {
373 for (@{$sumQasc{$sumQasc}}) {
374 printf '[v]drop%3u:',$outT if $outT > 0;
375 print " [$_],sumQasc:$sumQasc\n";
376 ++$outT;
380 $dmergeid=${$sumQasc{$t[0]}}[0];
381 goto RETURN; # return $dmergeid;
382 ### End Way1.
383 =pod
384 ### Way0: for debug.
385 my (%len,%hit,%sumQasc,$len,$hit,$sumQasc,@t,$outT);
386 for (keys %candidates) {
387 $len = $candidates{$_}->[0]->[0] + $candidates{$_}->[1]->[0];
388 $hit = $candidates{$_}->[0]->[1];
389 print "[!]$_ with different hits.\n" if $candidates{$_}->[0]->[1] != $candidates{$_}->[1]->[1];
390 $sumQasc = $candidates{$_}->[0]->[2] + $candidates{$_}->[1]->[2];
391 push @{$len{$len}},$_;
392 push @{$hit{$hit}},$_;
393 push @{$sumQasc{$sumQasc}},$_;
397 ### End Way0.
398 =cut
400 ###### return $dmergeid;
401 %candidates=();
402 ($cfileid,$csoapid)=split /_/,$dmergeid;
403 $sth3->execute($csoapid,$cfileid);
404 ++$itemsOutP0;
407 $dbh->commit;
408 #$dbh->do('CREATE INDEX IF NOT EXISTS c ON soap(chosen);'); if I just query once, no time benifit.
409 print STDERR 'D';
410 ##FINISH deDup.
412 print STDERR "o\b";
413 my ($red,$redid,@aline);
414 #open OUT,'>',$outfile; # opened
415 $sth5->execute; # PE & SE soapid,fileid,offset,'PE'/'SE',isTrim,pos
416 my ($pres,$len);
417 my ($Psoapid,$Pfileid,$Poffset,$PE,$Ppos);
418 while ( $pres=$sth5->fetchrow_arrayref ) {
419 ($Psoapid,$Pfileid,$Poffset,$PE,$isTrim,$Ppos)=@$pres;
420 if ($opt_m) {
421 $red=$PSE{$Pfileid}{$Poffset};
422 } else {
423 seek $FH{$Pfileid},$Poffset,0;
424 $red=readline $FH{$Pfileid};
426 @aline=split(/\t/,$red);
427 ($redid,$len)=@aline[0,5];
428 push @aline,$isTrim;
429 $red=join "\t",@aline[0..9,-1];
430 if ($PE eq 'PE') {
431 $bpsOP += $len; ++$itemsOutP;
432 } else { # $PE eq 'SE'
433 $bpsOS += $len; ++$itemsOutS;
435 print "[!]$Pfileid $PE [$Psoapid] <> [$redid], soap file changed.\n" if $Psoapid ne $redid;
436 print OUT $Pfileid,'_',$red,"\n"; # rename soapid with prefix
438 close OUT;
439 print STDERR 'O';
440 ##FINISH Output.
441 $the_time = [gettimeofday];
442 printf STDERR "\n-Finish Merge\033[32;1m @ %13.6f\033[0;0m sec.\n",tv_interval($start_time, $the_time);
444 close $_ for (values %FH); # SOAP files closed.
445 $dbh->commit or warn $dbh->errstr; # only 1 lib. No cycle, no DROP.
446 $dbh->disconnect; # in memory db, no file to delete.
447 ### End SQL
448 #$allDroped,$maxDroped,$itemsInP,$itemsOutP,$itemsInS,$bps{IP,OP,IS}
449 my ($itemsIn,$itemsOut,$bpsI,$bpsO);
450 $itemsIn=$itemsInP+$itemsInS;
451 $itemsOut=$itemsOutP+$itemsOutS;
452 $bpsI=$bpsIP+$bpsIS;
453 $bpsO=$bpsOP+$bpsOS;
454 my $report = "\033\\[32;1m\nIn Chr:[$opt_c]\t[PE,SE]\n
455 [${itemsInP},$itemsInS] ($itemsIn) lines from SOAP file(s). Trimed [${PEtrimed},${SEtrimed}].
456 [$o_f] PE reads filtered, causing [$o_fs] reads turn to SE and ignored.
458 PE pairs: [$itemsOutP0] chosen, [$allDroped] dropped, Max dropped per dupilcate is [$maxDroped].
460 [$itemsOutP,$itemsOutS] ($itemsOut) lines merged.
462 [$bpsIP,$bpsIS] ($bpsI) bps read in, [$bpsOP,$bpsOS] ($bpsO) bps write out.\033\\[0;0m
464 !!!\t$opt_c\t$bpsO\t$bpsI\t$itemsIn\t$itemsOut\t$itemsInP\t$itemsInS\t$itemsOutP\t$bpsIP\t$bpsIS\t$bpsOP\t$maxDroped\t$allDroped\t!!!\n\n";
465 $report =~ s/\[/[\033[0;0m/g;
466 $report =~ s/\]/\033[32;1m]/g;
467 $report =~ s/\\\[\033\[0;0m/[/g;
468 $report =~ s/\\\033\[32;1m\]/]/g;
469 $report =~ s/\(/(\033[0;0m/g;
470 $report =~ s/\)/\033[32;1m)/g;
471 $report =~ s/\\\(\033\[0;0m/(/g;
472 $report =~ s/\\\033\[32;1m\)/)/g;
473 print STDERR $report;
474 #END
475 my $stop_time = [gettimeofday];
477 print STDERR "\nTime Elapsed:\t",tv_interval( $start_time, $stop_time )," second(s).\n";
478 __END__