modified: src1/input.c
[GalaxyCodeBases.git] / BGI / BASE / src / contigs.c
blob9fb29eeda379c88b4b1b4deb96ba682d602ae4de
1 /*
3 contigs.cpp assemble contigs in BASE.
5 # Copyright (C) 2015, The University of Hong Kong.
7 # This program is free software; you can redistribute it and/or
8 # modify it under the terms of the GNU General Public License
9 # as published by the Free Software Foundation; either version 3
10 # of the License, or (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program; if not, write to the Free Software
19 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 03110-1301, USA.
21 Date : 28th Jan 2016
22 Author : Binghang Liu, bhliu@cs.hku.hk
25 #include "contigs.h"
26 #include <sys/resource.h>
28 struct rusage usage;
30 FILE* fci;
31 FILE* fcs;
32 FILE* fcm;
33 FILE* fcq;
34 FILE* logs;
36 int blank_cycle_time=0;
38 char* types[]={"LowDepth1", "LowDepth2", "HighDepth", "UsedRead1", "ManyIter", "SameSeed", "UsedRead2", "CB", "R2U", "CurrentUse1", "Cut", "CurrentUse2"};
40 void get_time_rss(double *utime, double *stime, int* maxrss)
42 if(getrusage(RUSAGE_SELF,&usage)){ return; }
43 *utime = 1e-6 * usage.ru_utime.tv_usec + usage.ru_utime.tv_sec;
44 *stime = 1e-6 * usage.ru_stime.tv_usec + usage.ru_stime.tv_sec;
45 *maxrss = usage.ru_maxrss;
48 void print_maxrss_and_time(){
50 if(getrusage(RUSAGE_SELF,&usage)){ return; }
51 double utime = 1e-6 * usage.ru_utime.tv_usec + usage.ru_utime.tv_sec;
52 double stime = 1e-6 * usage.ru_stime.tv_usec + usage.ru_stime.tv_sec;
53 fprintf(stderr, "%s:\t", __FILE__);
54 fprintf(stderr, "time\t= %lf (usr)\t+ %lf (sys)\t= %lf (sec)\t",utime , stime, utime + stime);
55 fprintf(stderr, "maxrss\t= %d\n", usage.ru_maxrss);
59 unsigned long long get_initial_read(unsigned long long startp, unsigned long long pair_num, int flag, int *c, int size)
61 unsigned long long i;
62 int count= *c, f1;
63 for(i=startp; i< pair_num; i++)
65 count++;
66 f1 = is_used_bwt_sparse(i, size);
68 if(f1 == 1 - flag )
69 continue;
71 if(f1 == flag)
73 *c = count;
74 return i;
77 *c = count;
78 return 0;
81 void print_contig(FILE* fcs, FILE* fcq, Contig *ctg, int id, int ctg_count)
83 fprintf(fcs, ">UC%d %d %d\n", ctg_count, ctg->len, id);//, ctg->seq);
84 int i;
85 for(i=0; i< ctg->len; i++)
87 fprintf(fcs, "%c", ctg->seq[i]);
88 if((i+1) %100 == 0)
90 fprintf(fcs, "\n");
93 if(i%100 != 0)
94 fprintf(fcs, "\n");
97 void complemant_seq(char* seq, int len)
99 int i;
100 for(i=0; i<len ; i++)
102 seq[i] = complementMap[ seq[i] ];
106 void reverse_seq(char* seq, int len)
108 char s[len];
109 int i,j;
110 for(i=0; i<len ; i++)
112 s[i] = complementMap[ seq[len-i-1]];
114 for(i=0; i<len ; i++)
116 seq[i] = s[i];
120 void reverse_contigs(Contig *ctg, int threadId)
122 char seq[ctg->len];
123 int i,j;
124 int num = tmpCount[threadId];
126 for(i=0; i<ctg->len ; i++)
128 seq[i] = complementMap[ ctg->seq[ctg->len-i-1]];
131 for(i=0; i<ctg->len ; i++)
133 ctg->seq[i] = seq[i];
136 if(num == 0)
137 return ;
139 j=0;
140 for(i=0; i< num; i++)
142 if(tmpRead[threadId][i].pos <= Read_length )
143 continue;
144 tmpRead[threadId][j] = tmpRead[threadId][i];
145 tmpRead[threadId][j].pos = int(ctg->len) + Read_length - tmpRead[threadId][i].pos;
146 tmpRead[threadId][j].strand = 1 - tmpRead[threadId][i].strand;
147 j++;
149 num = j;
151 TmpRead tmp;
152 for(j=0; j<= num/2; j++)
154 if(j >= num-1-j)
155 break;
156 tmp = tmpRead[threadId][j];
157 tmpRead[threadId][j] = tmpRead[threadId][num-1-j];
158 tmpRead[threadId][num-1-j] = tmp;
160 tmpCount[threadId] = num;
163 void update_readPos(TmpRead* tmpRead, int initial, int final)
165 int i;
166 for(i=initial; i< final; i++)
168 tmpRead[i].pos -= Read_length;
169 tmpRead[i].strand = 1 - tmpRead[i].strand;
173 void print_read(FILE* fp, TmpRead* tmpRead, int startp, int count)
175 int i;
176 fprintf(fp, ">C%d %d\n", startp, count);
177 for(i=0; i< count; i++)
179 fprintf(fp, "%d %d %d %d %d %d\n", i, tmpRead[i].id, tmpRead[i].strand, tmpRead[i].pos, tmpRead[i].iter, tmpRead[i].cons);
183 void print_g_iters(FILE* fp, Iter* iters, int startp, int count)
185 int i;
186 //fprintf(fp, ">C%d %d\n", startp, count);
187 for(i=0; i< count; i++)
189 fprintf(fp, "SEED%d\t%s\t%.1f\t%d\t%d\t%d\t%d\t%d\t%d\n", i, iters[i].seq, double(Read_length*iters[i].depth)/double(Read_length-iters[i].slen+1), iters[i].slen, iters[i].depth, iters[i].len, iters[i].rnum, iters[i].end, iters[i].pe);
193 void print_single_iter(FILE* fp, Iter* iters, int i)
195 fprintf(fp, "%d\t%s\t%d\t%d\t%d\t%d\t%d\t%d\n", i, iters[i].seq, iters[i].slen, iters[i].depth, iters[i].len, iters[i].rnum, iters[i].end, iters[i].pe);
198 void reverse_seq(char* seq)
200 int i, len = strlen(seq);
201 char rc[len];
202 for(i=0; i< len; i++)
204 rc[i] = complementMap[ seq[len-i-1]];
206 for(i=0; i< len; i++)
207 seq[i] = rc[i];
211 int is_cons(TmpRead* tmpRead, int count, int startp)
213 int i,j;
214 for(i=0; i< count; i++)
216 if(tmpRead[i].id == startp && tmpRead[i].keep == 1)
217 return 1;
219 return 0;
222 void print_read_seq(FILE* fp, TmpRead* tmpRead, int startp, int count)
224 fprintf(fp, ">C%d %d\n", startp, count);
225 int i,j;
226 char seq[101];
227 int qual[101];
228 for(i=0; i< count; i++)
230 get_id_seq(seq, qual, tmpRead[i].id, tmpRead[i].size);
231 if(tmpRead[i].strand == 0)
233 reverse_seq(seq);
235 for(j=0; j< tmpRead[i].pos; j++)
237 fprintf(fp, " ");
239 if(tmpRead[i].pos >= 0)
240 fprintf(fp, "%s %d %d %d %d %d\n", seq, tmpRead[i].id, tmpRead[i].strand, tmpRead[i].pos, tmpRead[i].iter, tmpRead[i].unique);
241 else
242 fprintf(fp, "%s %d %d %d %d %d\tCUT:%d\n", seq, tmpRead[i].id, tmpRead[i].strand, tmpRead[i].pos, tmpRead[i].iter, tmpRead[i].unique, - int(tmpRead[i].pos));
246 void clean_uniq(int threadId)
248 int i;
249 for(i=0; i< ctgs[threadId][ ctg_num[threadId] ].len; i++)
250 ctgs[threadId][ ctg_num[threadId] ].uniq[i] = '0';
253 void uniqer_single_seq(int threadId)
255 int end = ctgs[threadId][ ctg_num[threadId] ].len - 1, start, flag;
256 unsigned long long depth;
257 int step = depth_6_length > MINOVERLAP ? depth_6_length - MINOVERLAP : MINOVERLAP;
258 int break_start=0;
259 while(end > start + MINOVERLAP)
261 depth = 0;
262 flag = extend_backwards(ctgs[threadId][ ctg_num[threadId] ].seq, 0, end, tmpbase[threadId], &(depth), &break_start);
263 if(flag != -1 && end-flag < 79)
265 ctgs[threadId][ ctg_num[threadId] ].uniq[flag] = end - flag + '0';
266 end --;
267 }else
268 end -= step;
272 void uniqer_single_seq_initial(int threadId)
274 int end = ctgs[threadId][ ctg_num[threadId] ].len - 1, start, flag, break_start;
275 unsigned long long depth;
276 while(end > start + MINOVERLAP)
278 depth = 0;
279 flag = extend_backwards(ctgs[threadId][ ctg_num[threadId] ].seq, 0, end, tmpbase[threadId], &(depth), &break_start);
280 if(flag != -1 )
282 ctgs[threadId][ ctg_num[threadId] ].uniq[flag] = end - flag + '0';
284 end --;
289 int is_rich(char* u, int s, int len, int flag)
291 int i, zero=0;
292 int check_win, cret_win;
293 if(flag == 1)
295 check_win = MINOVERLAP/2;
296 cret_win = MINOVERLAP/4;
297 }else{
298 check_win = 5;
299 cret_win = 2;
301 //at least 5 28bp.
302 for(int i=s+1; i< len && i< s + check_win; i++)
304 if(flag == 1 && u[i] == '0')
305 zero++;
306 if(flag == 0 && u[i] != '0')
307 zero++;
309 if(zero > cret_win)
310 return 0;
311 return 1;
314 int get_next(char* u, int s, int len)
316 int i;
317 for(i=s+1; i<len-1; i++)
319 if(u[i] - '0' == MINOVERLAP-1 && is_rich(u, i, len, 1) == 1)
320 return i;
322 return -1;
325 int get_next_zero(char* u, int s, int len)
327 int i;
328 for(i=s+1; i<len-1; i++)
330 if(u[i] == '0' && is_rich(u, i, len, 0) == 1)
331 return i;
333 return -1;
336 int get_next_one(char* u, int s, int len)
338 int i;
339 for(i=s+1; i<len-1; i++)
341 if(u[i] != '0' && is_rich(u, i, len, 1) == 1)
342 return i;
344 return -1;
347 void remove_island(char* u, int len)
349 int i, next_i, j, min_island_size = MINOVERLAP;
350 for(i=0; i< len ;i++)
352 if(u[i] != '0')
354 next_i = get_next_zero(u, i, len);
355 if(next_i == -1)
357 for(j = i; j< len ; j++)
359 if(u[j] == '0')
360 u[j] = '0' + 1;
362 i = len;
363 break;
366 if(next_i - i < min_island_size)
368 for(j=i; j< next_i; j++)
370 if(u[j] == '0')
371 continue;
372 u[j] = '0';
374 }else{
375 for(j=i+1; j< next_i; j++)
377 if(u[j] == '0')
378 u[j] = '0' + 1;
381 i = next_i;
386 void keep_repeat_end(char* u, int len)
388 int i, j, next_i, keep_len = MINOVERLAP;
389 for(i=0; i< len ;i++)
391 if(u[i] == '0')
393 next_i = get_next_one(u, i, len);
394 if(next_i == -1)
396 for(j = i; j< i+keep_len && j<len ; j++)
398 if(u[j] == '0')
399 u[j] = '0' + 1;
401 i=len;
402 break;
405 if(next_i - i < Read_length -10)
407 for(j=i; j< next_i; j++)
409 if(u[j] == '0')
410 u[j] = '0' + 1;
412 }else{
413 int tmp_keep_len = (next_i - i - 10)/2;
414 if(tmp_keep_len > keep_len)
415 tmp_keep_len = keep_len;
417 for(j=i; i>keep_len && j< i + tmp_keep_len; j++)
419 if(u[j] == '0')
420 u[j] = '0' + 1;
422 for(j=next_i - 1; j>= next_i - tmp_keep_len; j--)
424 if(u[j] == '0')
425 u[j] = '0' + 1;
427 for(; j>= i+tmp_keep_len || (i==0 && j> 0); j--)
429 if(u[j] != '0')
430 u[j] = '0';
433 i = next_i;
438 void uniqer2region(char* u, int len)
440 int i, zero = MINOVERLAP, next_i, now_i, j;
441 for(i=0; i< len ;i++)
443 if(u[i] - '0' == MINOVERLAP-1)
445 zero = 0;
446 }else{
447 next_i = get_next(u, i, len);
448 if(next_i == -1)
450 for(j = i; j< len ; j++)
451 u[j] = '0';
452 i = len;
453 break;
456 for(j=i+1; j< next_i-1; j++)
458 if(u[j] == '0')
459 continue;
460 u[j] = '0';
463 now_i = next_i;
464 char c;
465 for(j=now_i-1; j>=i; j--)
467 if(now_i - j > Read_length - 1 - (u[now_i] - '0'))
468 break;
469 c = u[now_i] + (now_i - j);
470 if(u[j] > c || u[j] == '0')
471 u[j] = c;
472 else
473 now_i = j;
476 i = next_i;
481 void print_single_uniq(FILE* fo, char* u, char* id, int len)
483 fprintf(fo, ">%s %d\n", id, len);
484 int i, zero = MINOVERLAP, next_i, now_i, j;
486 for(i=0; i< len ;i++)
488 if(u[i] - '0' == MINOVERLAP-1)
490 zero=0;
491 fprintf(fo, "%c", u[i]);
492 if((i+1)%100 == 0)
493 fprintf(fo, "\n");
494 }else
496 next_i = get_next(u, i, len);
497 if(next_i == -1)
499 for(j = i; j< len ; j++)
501 fprintf(fo, "%c", '0');
502 if((j+1)%100 == 0)
503 fprintf(fo, "\n");
505 i = len;
506 break;
509 for(j=i+1; j< next_i-1; j++)
511 if(u[j] == '0')
512 continue;
513 u[j] = '0';
516 now_i = next_i;
517 char c;
518 for(j=now_i-1; j>=i; j--)
520 if(now_i - j > Read_length - 1 - (u[now_i] - '0'))
522 break;
524 c = u[now_i] + now_i - j;
525 if(u[j] > c || u[j] == '0')
527 u[j] = c;
528 }else{
529 now_i = j;
533 for(j=i; j<= next_i; j++)
535 if(u[j] > '0')
536 fprintf(fo, "%c", u[j]);
537 else
538 fprintf(fo, "%c", '0');
539 if((j+1)%100 == 0)
540 fprintf(fo, "\n");
542 i = next_i;
545 if(i%100 != 0)
546 fprintf(fo, "\n");
549 void print_ctgs(FILE* fcs, FILE* fcq, FILE* fcm, FILE* flog)
551 int i, j, k;
552 for(i=0; i< threadNum; i++)
554 for(j=0; j< ctg_num[i]; j++)
556 fprintf(flog, "UC%d %d %d %d %s,%s %d,%d %d,%d %d\n", g_count, ctgs[i][j].id, ctgs[i][j].s_len, ctgs[i][j].s_depth, types[ctgs[i][j].back_end], types[ctgs[i][j].forw_end],
557 ctgs[i][j].back_iter, ctgs[i][j].forw_iter, ctgs[i][j].back_rc, ctgs[i][j].forw_rc, ctgs[i][j].len);
558 fprintf(fcs, ">UC%d %d %lld\n", g_count, ctgs[i][j].len, ctgs[i][j].id);
559 fprintf(fcm, ">UC%d\n", g_count);
560 g_count++;
561 for(k=0; k< ctgs[i][j].len; k++)
563 fprintf(fcs, "%c", ctgs[i][j].seq[k]);
564 if(ctgs[i][j].uniq[k] != '0')
565 fprintf(fcm, "%c", ctgs[i][j].seq[k]);
566 else
567 fprintf(fcm, "N");
568 if((k+1) %100 == 0)
570 fprintf(fcs, "\n");
571 fprintf(fcm, "\n");
574 if(k%100 != 0)
576 fprintf(fcs, "\n");
577 fprintf(fcm, "\n");
579 ctgs[i][j].len = 0;
580 ctgs[i][j].seq[0] = '\0';
582 ctg_num[i] = 0;
586 void print_ctg(FILE* fcs, FILE* fcq, FILE* flog)
588 int i, j, k;
589 for(i=0; i< threadNum; i++)
591 for(j=0; j< ctg_num[i]; j++)
593 fprintf(flog, "UC%d %d %d %d %s,%s %d,%d %d,%d %d\n", g_count, ctgs[i][j].id, ctgs[i][j].s_len, ctgs[i][j].s_depth, types[ctgs[i][j].back_end], types[ctgs[i][j].forw_end],
594 ctgs[i][j].back_iter, ctgs[i][j].forw_iter, ctgs[i][j].back_rc, ctgs[i][j].forw_rc, ctgs[i][j].len);
595 fprintf(fcs, ">UC%d %d %lld\n", g_count, ctgs[i][j].len, ctgs[i][j].id);
596 g_count++;
597 for(k=0; k< ctgs[i][j].len; k++)
599 fprintf(fcs, "%c", ctgs[i][j].seq[k]);
600 if((k+1) %100 == 0)
602 fprintf(fcs, "\n");
605 if(k%100 != 0)
606 fprintf(fcs, "\n");
607 ctgs[i][j].len = 0;
608 ctgs[i][j].seq[0] = '\0';
610 ctg_num[i] = 0;
614 unsigned long long get_next_bwt_id(unsigned long long startp, int size, unsigned long long *c)
616 unsigned long long i;
617 int count= *c, f1, f2, cons=1;
619 if(startp % 2== 0)
621 count++;
622 if(is_used_bwt_sparse(startp+1, size) == 0)
623 return startp+1;
624 }else{
625 startp --;
628 for(i=startp + 2* gthreadNum; i< pair_num; i+= 2*gthreadNum)
630 count++;
631 if(is_used_bwt_sparse(i, size) == 0)
633 *c = count;
634 return i;
636 count++;
638 if(is_used_bwt_sparse(i+1, size) == 0)
640 *c = count;
641 return i+1;
644 *c = count;
645 return 0;
648 int get_next_in_conf(Id* conf, int conf_num, unsigned long long* id, int *size, int *count)
650 int i=*count+1, f1=0, cons=0;
651 for(; i< conf_num; i++)
653 if(conf[i].c == 0)
654 continue;
655 if(is_used_bwt_sparse(conf[i].c, *size) == 0 )
657 *count = i;
658 *id = conf[i].c;
659 conf[i].c = 0;
660 return 1;
662 conf[i].c = 0;
664 *count = i;
665 return 0;
668 void slim_conf(Id* conf, int* conf_num)
670 int count = *conf_num;
671 int i, j;
672 for(i=0, j=0; i< count; i++)
674 if(conf[i].c == 0 || conf[j-1].c == conf[i].c)
675 continue;
676 conf[j].c = conf[i].c;
677 conf[i].c = 0;
678 j++;
680 *conf_num = j;
683 void add_conflict_buffer(unsigned long long start_id, int size, int threadId, int count)
685 if(conf_size[threadId] == buffer_size)
687 //if(count < 0)
689 // fprintf(stderr, "buffer Error: thread %d\n", threadId);
690 // exit(1);
692 conf_buffer[threadId][count+1].c = (ULL)start_id;
693 conf_buffer[threadId][count+1].size = (ULL)size;
694 }else{
695 conf_buffer[threadId][ conf_size[threadId] ].c = (ULL)start_id;
696 conf_buffer[threadId][ conf_size[threadId] ].size = (ULL)size;
697 conf_size[threadId]++;
701 void get_next_id(int threadId, unsigned long long* id, int* size, int *count)
703 unsigned long long start_id;
704 int start_size, prev_count;
705 if(*id != 0)
707 start_id = *id;
708 start_size = *size;
709 }else{
710 start_id = current_id[threadId].c;
711 start_size = current_id[threadId].size;
713 //find in conf_buffer.
714 if(conf_size[threadId] >= buffer_size - 1)
716 prev_count = *count + 1;
717 int find_flag = get_next_in_conf(conf_buffer[threadId], conf_size[threadId], &start_id, &start_size, count);
718 endInf[threadId].c += *count - prev_count;
719 if(*count == conf_size[threadId])
721 slim_conf(conf_buffer[threadId], &(conf_size[threadId]));
722 *count = -1;
724 if(find_flag == 1)
726 *id = start_id;
727 *size = start_size;
728 return ;
730 start_id = current_id[threadId].c;
731 start_size = current_id[threadId].size;
734 //find in bwt.
735 *id = get_next_bwt_id(start_id, start_size, &(endInf[threadId].c));
736 *size = start_size;
738 if(*id == 0 && conf_size[threadId] > 0)
740 prev_count = *count + 1;
741 int find_flag = get_next_in_conf(conf_buffer[threadId], conf_size[threadId], &start_id, &start_size, count);
742 endInf[threadId].c += *count - prev_count;
744 if(*count == conf_size[threadId])
746 slim_conf(conf_buffer[threadId], &(conf_size[threadId]));
747 *count = -1;
750 if(find_flag == 1)
752 *id = start_id;
753 *size = start_size;
754 return ;
758 current_id[threadId].c = *id;
759 current_id[threadId].size = *size;
763 int layout_single_contig_thread(unsigned long long startp, int size, int threadId)
765 clock_t start_time, now_time;
767 if(DEBUG)
769 int id = 20900; //4302;//64416; //1520;
770 if(startp > id)
772 print_ctg(fcs, fcq, logs);
773 exit(4);
776 if(startp != id)
777 return 0;
779 ctgs[threadId][ctg_num[threadId]].id = startp;
780 initial_contig_seed(stdout, startp, size, threadId);
782 if(DEBUG)
784 fprintf(stdout, "Starting from %d", startp);
785 fprintf(stdout, " %d,%d\n", g_iters[threadId][0].slen, g_iters[threadId][0].depth);
787 g_iters[threadId][0].rnum =0; g_iters[threadId][0].end = 0; g_iters[threadId][0].pe=0; g_iters[threadId][0].len=0;
789 if(g_iters[threadId][0].depth == 0)
791 ctgs[threadId][ctg_num[threadId]].len=0; ctgs[threadId][ctg_num[threadId]].seq[0]='\0';
792 return 0;
794 double utime0=0, utime1=0, stime0=0, stime1=0;
795 int maxrss0=0, maxrss1=0;
797 if(DEBUG)
799 fprintf(stderr, "For contig starting from %lld\n", startp);
800 get_time_rss(&utime0, &stime0, &maxrss0);
802 tmpCount[threadId] = 0;
803 iterCount[threadId] = 0;
805 four_iter[threadId] = 0;
806 four_iter_checked_read[threadId] = 0;
807 four_iter_used_read[threadId] = 0;
809 ctgs[threadId][ctg_num[threadId]].s_len = g_iters[threadId][0].slen;
810 ctgs[threadId][ctg_num[threadId]].s_depth = g_iters[threadId][0].depth;
812 int flag = backward_extending_bwtpe(stdout, 1, threadId);
813 if(DEBUG)
815 get_time_rss(&utime1, &stime1, &maxrss1);
816 fprintf(stderr, "utime diff %.4f, stime diff %.4f, maxrss diff %d\n", utime1 - utime0, stime1 - stime0, maxrss1 - maxrss0);
818 if(flag == 1)
820 ctgs[threadId][ctg_num[threadId]].len=0; ctgs[threadId][ctg_num[threadId]].seq[0]='\0';
821 return 1;
823 int plus_iter = iterCount[threadId] - 1;
824 ctgs[threadId][ctg_num[threadId]].back_end = g_iters[threadId][ plus_iter ].end;
825 ctgs[threadId][ctg_num[threadId]].back_iter = iterCount[threadId];
827 reverse_contigs(&(ctgs[threadId][ctg_num[threadId]]), threadId);
828 ctgs[threadId][ctg_num[threadId]].back_rc = tmpCount[threadId];
830 if(DEBUG) fprintf(stdout, "\nReverse\n");
832 reverse_com(g_iters[threadId][0].seq, g_iters[threadId][ iterCount[threadId] ].seq, g_iters[threadId][0].slen);
833 unsigned long long depth =0;
835 set_SAranges(g_iters[threadId][ iterCount[threadId] ].seq, 0, g_iters[threadId][0].slen-1, g_iters[threadId][ iterCount[threadId] ].b, &(depth));
836 g_iters[threadId][ iterCount[threadId] ].slen = g_iters[threadId][0].slen;
838 if(depth < MAXDEPTH)
839 g_iters[threadId][ iterCount[threadId] ].depth = depth;
840 else
841 g_iters[threadId][ iterCount[threadId] ].depth = MAXDEPTH;
843 g_iters[threadId][ iterCount[threadId] ].rnum =0; g_iters[threadId][ iterCount[threadId] ].end = 0; g_iters[threadId][ iterCount[threadId] ].pe=0; g_iters[threadId][ iterCount[threadId] ].len=0;
845 flag = backward_extending_bwtpe(stdout, 1, threadId);
846 if(DEBUG)
848 get_time_rss(&utime1, &stime1, &maxrss1);
849 fprintf(stderr, "utime diff %.4f, stime diff %.4f, maxrss diff %d\n", utime1 - utime0, stime1 - stime0, maxrss1 - maxrss0);
851 if(flag == 1)
853 ctgs[threadId][ctg_num[threadId]].len=0; ctgs[threadId][ctg_num[threadId]].seq[0]='\0';
854 return 1;
856 ctgs[threadId][ctg_num[threadId]].forw_end = g_iters[threadId][ iterCount[threadId] - 1 ].end;
857 ctgs[threadId][ctg_num[threadId]].forw_iter = iterCount[threadId] - plus_iter;
858 ctgs[threadId][ctg_num[threadId]].forw_rc = tmpCount[threadId] - ctgs[threadId][ctg_num[threadId]].back_rc;
860 if(tmpCount[threadId] > 0)
861 update_readPos(tmpRead[threadId], 0, tmpCount[threadId]);
864 if(tmpCount[threadId] > 0)
866 print_read(fci, tmpRead[threadId], startp, tmpCount[threadId]);
870 if(DEBUG)
872 if(iterCount[threadId] > 0)
874 fprintf(stdout, ">C%d\n", startp);
875 //print_single_iter(stdout, g_iters, plus_iter); print_single_iter(stdout, g_iters, iter-1);
876 print_g_iters(stdout, g_iters[threadId], startp, iterCount[threadId]);
879 //fflush(stdout);
880 if(ctgs[threadId][ctg_num[threadId]].len > 0)
881 complemant_seq(ctgs[threadId][ctg_num[threadId]].seq, ctgs[threadId][ctg_num[threadId]].len);
883 if(ctgs[threadId][ctg_num[threadId]].len >=Read_length && ctgs[threadId][ctg_num[threadId]].len <= 1.5 * Read_length)
885 if(is_cons(tmpRead[threadId], tmpCount[threadId], startp) == 0
886 || (g_iters[threadId][ iterCount[threadId] - 1 ].end == 6 && g_iters[threadId][ plus_iter ].end == 6)
887 || (ctgs[threadId][ctg_num[threadId]].len <= 1.1* Read_length && (g_iters[threadId][ iterCount[threadId] - 1 ].end == 6 || g_iters[threadId][ plus_iter ].end == 6))
890 ctgs[threadId][ctg_num[threadId]].len = 0; ctgs[threadId][ctg_num[threadId]].seq[0] = '\0';
891 clean_trimed_reads(ctgs[threadId][ctg_num[threadId]].len, tmpCount[threadId], threadId);
895 if(RepeatMask == 1 && ctgs[threadId][ctg_num[threadId]].len >= Read_length)
897 clean_uniq(threadId);
898 uniqer_single_seq_initial(threadId);
899 remove_island(ctgs[threadId][ctg_num[threadId]].uniq, ctgs[threadId][ctg_num[threadId]].len);
902 if(DEBUG) fprintf(stdout, "Finished, length=%d\n", ctgs[threadId][ctg_num[threadId]].len);
903 free_thread_tmpRead(threadId, 0);
905 if(DEBUG)
907 get_time_rss(&utime1, &stime1, &maxrss1);
908 fprintf(stderr, "utime diff %.4f, stime diff %.4f, maxrss diff %d\n", utime1 - utime0, stime1 - stime0, maxrss1 - maxrss0);
910 return 0;
913 void extending_thread(int threadId)
915 if(threadEnd[threadId] == 1)
916 return ;
917 unsigned long long start_id=0;
918 clock_t start_time, now_time;
919 int size=0, count=-1;
920 get_next_id(threadId, &start_id, &size, &count);
922 while(start_id > 0 )
924 ctgs[threadId][ ctg_num[threadId] ].len = 0;
926 int has_conflict = layout_single_contig_thread(start_id, size, threadId);
928 endInf[threadId].uc ++;
929 endInf[threadId].len += ctgs[threadId][ ctg_num[threadId] ].len;
931 if(ctgs[threadId][ ctg_num[threadId] ].len > 2* Read_length)
932 endInf[threadId].c = 0;
933 else if(has_conflict == 0){
934 endInf[threadId].c ++;
936 if(endInf[threadId].c > count_pair_thread )//|| (endInf[threadId].uc > count_pair_thread && endInf[threadId].len/endInf[threadId].uc <= MINOVERLAP))
938 break;
941 if(gthreadNum == 1 && start_id > count_pair)
942 break;
943 if(has_conflict == 1)
945 add_conflict_buffer(start_id, size, threadId, count);
946 }else if(ctgs[threadId][ ctg_num[threadId] ].len >= Read_length)
948 endInf[threadId].ulen += ULL(ctgs[threadId][ ctg_num[threadId] ].len);
949 ctg_num[threadId] ++;
950 if(ctg_num[threadId] >= buffer_size)
951 break;
953 get_next_id(threadId, &start_id, &size, &count);
955 if(start_id == 0)
957 threadEnd[threadId] = 1;
958 fprintf(stderr, "Thread: %d finished.\n", threadId);
959 }else{
960 fprintf(stderr, "Thread%d buffer full\n", threadId);
964 void* threadRoutine(void* threadId_p)
966 int threadId = *((int*) threadId_p);
967 //check the status of each thread.
968 while (1)
970 if (signal[threadId] == 0)
972 usleep(1);
973 continue;
974 }else if (signal[threadId] == 1)
976 extending_thread(threadId);
977 signal[threadId] = 0;
978 }else
979 break; //unkown signal.
984 static void creatThrds ( pthread_t * threads , int* threadId)
986 unsigned char i;
987 int temp;
988 for(i=0; i< threadNum; i++)
990 threadId[i]=i;
991 signal[i] = 0;
993 for ( i = 0; i < threadNum; i++ )
995 if ( ( temp = pthread_create ( threads + i, NULL, threadRoutine, (void*)(threadId + i) ) ) != 0 )
997 fprintf ( stderr, "Create threads failed.\n" );
998 exit ( 1 );
1002 fprintf ( stderr, "%d thread(s) initialized.\n", threadNum );
1005 void sendSignal(int SIG)
1007 int i;
1008 for (i=0; i<threadNum; i++)
1010 signal[i] = SIG;
1013 //checking the signal of each thread.
1014 if(SIG == 1)
1016 while (1)
1018 usleep(1);
1020 for (i=0; i<threadNum; i++)
1022 if (signal[i] != 0)
1023 break;
1026 if (i == threadNum)
1027 break;
1029 }else if(SIG == 2)
1031 for(i=0; i< threadNum; i++)
1033 pthread_join ( threads[i], NULL );
1036 //after return, all the singal=0.
1039 void initial_current(int size)
1041 int i, j;
1042 for(i=0, j=0; i< threadNum; i++, j+=2)
1044 current_id[i].c = j;
1045 current_id[i].size = size;
1046 threadEnd[i] = 0;
1050 int is_all_end()
1052 int i, all_end = 1;
1053 for(i=0; i< threadNum; i++)
1055 if(threadEnd[i] == 0)
1057 all_end = 0;
1058 break;
1061 return all_end;
1063 int len_dif_zero;
1064 int is_terminate(int to_single)
1066 int i;
1067 unsigned long long total_c=0, total_used=0, total_len=0, total_usedc=0, total_usedlen = 0;
1069 for(i=0; i< threadNum; i++)
1071 total_c += endInf[i].c;
1072 total_used += endInf[i].ur;
1073 total_len += endInf[i].len;
1074 total_usedc += endInf[i].uc;
1075 total_usedlen += endInf[i].ulen;
1076 if(gthreadNum != 1)
1078 endInf[i].len = 0; endInf[i].uc = 0;
1082 double current_read_ratio = double(total_used)/double(pair_num)*100.0;
1083 if(total_usedc == 0)
1084 return 1;
1085 int avg_len = total_len/total_usedc;
1086 double len_dif = 1;
1087 double read_ratio_dif = current_read_ratio - read_ratio;
1089 if(ass_len > 0)
1090 len_dif = double(total_usedlen - ass_len)/double(ass_len);
1092 if(gthreadNum == 1)
1094 if(current_id[0].c < count_pair)
1096 fprintf(stderr, "curr id %lld and count pair %lld\n", current_id[0].c, count_pair);
1097 blank_cycle_time ++;
1098 if(blank_cycle_time < 10 || read_ratio_dif > 0.0001)
1099 return 0;
1101 blank_cycle_time = 0;
1102 if(avg_len >= old_avg_len && avg_len <= MINOVERLAP)
1103 is_platform ++;
1104 else if(avg_len < old_avg_len)
1105 is_platform = 0;
1107 if(len_dif < MINLENDIF)
1108 len_dif_zero++;
1109 else
1110 len_dif_zero=0;
1111 fprintf(stderr, "Stat: %lld %d %f %f %lld %d %lld %f %d %d\n", count_pair, total_c, current_read_ratio, read_ratio_dif, total_len, total_usedc, ass_len, len_dif, avg_len, len_dif_zero);
1112 if(read_ratio_dif < MINRATEDIF || (len_dif < MINLENDIF && len_dif_zero >= 2) || total_c > increase || is_platform > 2)//|| (is_platform == 1 && avg_len < old_avg_len && avg_len < Read_length/2))
1114 fprintf(stderr, "\nStop extension: %f < %f, or %d > %d, or platform=%d \n", read_ratio_dif, MINRATEDIF, total_c, increase/2, is_platform);
1115 return 1;
1117 count_pair += increase;
1118 endInf[0].len = 0; endInf[0].uc = 0;
1119 read_ratio = current_read_ratio;
1120 ass_len = total_usedlen;
1121 old_avg_len = avg_len;
1122 }else{
1123 if(avg_len >= old_avg_len && avg_len <= MINOVERLAP)
1124 is_platform ++;
1125 else if(avg_len < old_avg_len)
1126 is_platform = 0;
1128 fprintf(stderr, "Stat: %d %lld %lld %d %d %lld %f %f %f\n", total_c, total_used, total_len, total_usedc, avg_len, total_usedlen, len_dif, current_read_ratio, read_ratio_dif);
1129 if((read_ratio_dif < MINRATEDIF && (current_read_ratio > 50 || read_ratio_dif == 0)) || is_platform > 2 || (read_ratio_dif < MINRATEDIF*threadNum && is_platform >=2) || len_dif < MINLENDIF)
1130 return 1;
1132 if(current_read_ratio > 80.5 || (read_ratio_dif < 0.1 && current_read_ratio > 50))// || to_single == 1)
1134 for(i=1; i< threadNum; i++)
1136 threadEnd[i] = 1;
1137 endInf[i].c = 0;
1139 gthreadNum = 1;
1140 fprintf(stderr, "Starting single thread for finishing assembly\n");
1141 count_pair_thread = count_pair/2;
1143 if( current_id[0].c > count_pair)
1144 count_pair = (current_id[0].c / count_pair + 2) * count_pair;
1145 is_platform = 0;
1147 read_ratio = current_read_ratio;
1148 old_avg_len = avg_len;
1149 ass_len = total_usedlen;
1151 return 0;
1154 void layout_contigs_bwt_multi(char* prefix, unsigned long long* contig_num, int method)
1156 int c=0, size = 0, i, j;
1157 clock_t start_time, now_time;
1158 char name[255];
1159 int max_node_num = MAXDEPTH * Read_length;
1160 len_dif_zero = 0; blank_cycle_time = 0;
1161 count_pair = pair_num * 1.0 / double(Expect_depth * 5); // G/500/2, with expect N50 >1k. then one window, one genome.
1163 count_pair_thread = count_pair/threadNum;
1164 gthreadNum = threadNum;
1165 increase = count_pair;
1167 buffer_size = count_pair_thread /2 / 10 /3; //10 buffers with thread num could fill in all the whole genome(N50=1k).
1168 if(buffer_size < 10)
1169 buffer_size = 10;
1171 fprintf(stderr, "Checking window for termination: %d and perthread %d and buffer size %d\n", count_pair, count_pair_thread, buffer_size);
1173 //find the starting library.
1174 while(FLAGS[size] == 0 && size < bwtCount)
1175 size++;
1176 if(size == bwtCount)
1178 fprintf(stderr, "You must at least set one bwt for seeding.\n");
1179 exit(6);
1182 start_time = clock();
1184 //define and initialization.
1186 ctgs = (Contig**)calloc(threadNum, sizeof(Contig*));
1187 ctg_num = (int*)calloc(threadNum, sizeof(int));
1188 tmpRead = (TmpRead**)calloc(threadNum, sizeof(TmpRead*));
1189 tmpCount = (int*)calloc(threadNum, sizeof(int));
1190 g_iters = (Iter**)calloc(threadNum, sizeof(Iter*));
1191 iterCount = (int*)calloc(threadNum, sizeof(int));
1193 g_seq = (char**)calloc(threadNum, sizeof(char*));
1194 g_nodes = (Node**)calloc(threadNum, sizeof(Node*));
1195 g_levs = (Lev**)calloc(threadNum, sizeof(Lev*));
1196 g_reads = (ReadId**)calloc(threadNum, sizeof(ReadId*));
1198 tmpbase = (Base**)calloc(threadNum, sizeof(Base*));
1199 sbase = (Base***)calloc(threadNum, sizeof(Base**));
1201 threads = (pthread_t*) calloc( threadNum, sizeof(pthread_t) );
1202 int threadId[threadNum];
1203 signal = (int*)calloc(threadNum, sizeof(int));
1205 conf_buffer = (Id**)calloc(threadNum, sizeof(Id*));
1206 conf_size = (int*)calloc(threadNum, sizeof(int));
1207 current_id = (Id*)calloc(threadNum, sizeof(Id));
1208 threadEnd = (int*)calloc(threadNum, sizeof(int));
1209 endInf = (EndInf*)calloc(threadNum, sizeof(EndInf));
1211 four_iter_used_read = (int*)calloc(threadNum, sizeof(int));
1212 four_iter_checked_read = (int*)calloc(threadNum, sizeof(int));
1213 four_iter = (int*)calloc(threadNum, sizeof(int));
1215 for(i=0; i< threadNum; i++)
1217 ctgs[i] = (Contig*)calloc(buffer_size+1, sizeof(Contig));
1218 conf_buffer[i] = (Id*)calloc(buffer_size+1, sizeof(Id));
1219 conf_size[i]=0;
1220 ctg_num[i] = 0;
1221 for(j=0; j< buffer_size; j++)
1223 ctgs[i][j].seq = (char*)calloc(MaxCtgLen, sizeof(char));
1224 //ctgs[i][j].qual = (char*)calloc(MaxCtgLen, sizeof(char));
1225 ctgs[i][j].uniq = (char*)calloc(MaxCtgLen, sizeof(char));
1228 tmpRead[i] = (TmpRead*)calloc(MaxReadCount, sizeof(TmpRead));
1229 tmpCount[i] = 0;
1231 g_iters[i] = (Iter*)calloc(MaxIterNum, sizeof(Iter));
1232 iterCount[i] = 0;
1233 for(j=0; j< MaxIterNum; j++)
1235 g_iters[i][j].seq = (char*)calloc(Read_length, sizeof(char));
1236 g_iters[i][j].b = (Base*)calloc(bwtCount, sizeof(Base));
1239 g_seq[i] = (char*)calloc(Read_length, sizeof(char));
1240 g_nodes[i] = (Node*)calloc(max_node_num, sizeof(Node));
1241 g_levs[i] = (Lev*)calloc(Read_length+1, sizeof(Lev));
1242 g_reads[i] = (ReadId*)calloc(MAXDEPTH, sizeof(ReadId));
1243 for(j=0; j< max_node_num; j++)
1245 g_nodes[i][j].seq = (char*) calloc(Read_length, sizeof(char));
1246 g_nodes[i][j].len = 0;
1247 g_nodes[i][j].conf = 0;
1248 g_nodes[i][j].b = (Base*) calloc(bwtCount, sizeof(Base));
1251 for(j=0; j< Read_length+1; j++)
1253 g_levs[i][j].c =(unsigned short*) calloc(5, sizeof(unsigned short));
1256 tmpbase[i] = (Base*)calloc(bwtCount, sizeof(Base));
1257 for(j=0; j< bwtCount; j++)
1259 tmpbase[i][j].saL = 0; tmpbase[i][j].saR = 0; tmpbase[i][j].saLL = 0; tmpbase[i][j].saRR = 0;
1260 tmpbase[i][j].plus = 0; tmpbase[i][j].minus = 0; tmpbase[i][j].depth = 0;
1263 sbase[i] = (Base**)calloc(4, sizeof(Base*));
1264 for(j=0; j< 4; j++)
1265 sbase[i][j] = (Base*)calloc(bwtCount, sizeof(Base));
1266 endInf[i].c = 0; endInf[i].ur = 0; endInf[i].uc = 0; endInf[i].len = 0; endInf[i].ulen = 0;
1268 fprintf(stderr, "After initial, check mem:\n");
1269 print_maxrss_and_time();
1270 //generate files.
1271 //sprintf(name, "%s.contig", prefix);
1272 //fci = fopen(name, "w");
1273 sprintf(name, "%s.contig.fa", prefix);
1274 fcs = fopen(name, "w");
1275 //sprintf(name, "%s.contig.fa.maskRep", prefix);
1276 //fcm = fopen(name, "w");
1277 //sprintf(name, "%s.contig.qual", prefix);
1278 //fcq = fopen(name, "w");
1279 sprintf(name, "%s.contig.log", prefix);
1280 logs = fopen(name, "w");
1282 now_time = clock();
1284 double startTime;
1285 double elapsedTime = 0, totalElapsedTime = 0, prevTime = 0;
1286 startTime = setStartTime();
1288 fprintf(stderr, "Initial time: %f seconds\n", (double)(now_time - start_time)/CLOCKS_PER_SEC );
1289 creatThrds (threads, threadId);
1291 //define for termination of assembly.
1292 initial_current(size);
1293 int cycle=0, to_single = 0;;
1294 while(is_all_end() == 0)
1296 //assembly per thread.
1297 sendSignal(1);
1299 elapsedTime = getElapsedTime(startTime) - totalElapsedTime;
1300 if(to_single == 0 && prevTime > 0 && elapsedTime > prevTime*Upper_bound)
1301 to_single = 1;
1303 totalElapsedTime += elapsedTime;
1304 if(prevTime == 0 || prevTime > elapsedTime)
1305 prevTime = elapsedTime;
1306 now_time = clock();
1308 if(RepeatMask == 1)
1309 print_ctgs(fcs, fcq, fcm, logs);
1310 else
1311 print_ctg(fcs, fcq, logs);
1313 //check terminate.
1314 if(is_terminate( to_single ) == 1)
1316 fprintf(stderr, "Extension finished, ReadUsedRate %f, TotalLength %lld, CPU time %f, ", read_ratio, ass_len, (double)(now_time - start_time)/CLOCKS_PER_SEC);
1317 fprintf(stderr, "Realtime "); printElapsedTime(stderr, FALSE, FALSE, TRUE, 2, totalElapsedTime);
1318 break;
1320 fprintf(stderr, "%d cycle buffer finished, ReadUsedRate %f, TotalLength %lld, CPUtime %f, ", cycle, read_ratio, ass_len, (double)(now_time - start_time)/CLOCKS_PER_SEC);
1321 fprintf(stderr, "Realtime "); printElapsedTime(stderr, FALSE, FALSE, TRUE, 2, elapsedTime);
1323 cycle++;
1324 //checking more library.
1325 if(is_all_end() == 1)
1327 size++;
1328 while(FLAGS[size] == 0 && size < bwtCount)
1329 size++;
1330 if(size >= bwtCount)
1331 break;
1332 fprintf(stderr, "Checking from library %d \n", size);
1333 initial_current(size);
1336 *contig_num = g_count;
1337 //fprintf(stderr, "All extension finished.\n");
1338 //free memory, thread and close file.
1340 sendSignal(2);
1341 //fprintf(stderr, "All thread freeed.\n");
1343 for(i=0; i< threadNum; i++)
1345 for(j=0; j< buffer_size; j++)
1347 free(ctgs[i][j].seq);
1348 //free(ctgs[i][j].qual);
1349 free(ctgs[i][j].uniq);
1352 for(j=0; j< MaxIterNum; j++)
1354 free(g_iters[i][j].seq);
1355 free(g_iters[i][j].b);
1358 for(j=0; j< max_node_num; j++)
1360 free(g_nodes[i][j].seq);
1361 free(g_nodes[i][j].b);
1364 for(j=0; j< Read_length+1; j++)
1366 free(g_levs[i][j].c);
1369 for(j=0; j< 4; j++)
1370 free(sbase[i][j]);
1373 for(i=0; i< threadNum; i++)
1375 free(ctgs[i]);
1376 free(tmpRead[i]);
1377 free(conf_buffer[i]);
1379 free(g_iters[i]);
1380 free(g_seq[i]);
1381 free(g_reads[i]);
1383 free(g_nodes[i]);
1384 free(g_levs[i]);
1385 free(tmpbase[i]);
1386 free(sbase[i]);
1388 free(conf_buffer);
1389 free(conf_size);
1390 free(current_id);
1391 free(threadEnd);
1393 free(four_iter_used_read);
1394 free(four_iter_checked_read);
1395 free(four_iter);
1397 free(ctgs);
1398 free(ctg_num);
1399 free(tmpRead);
1400 free(tmpCount);
1401 free(g_iters);
1402 free(iterCount);
1404 free(g_seq);
1405 free(g_nodes);
1406 free(g_levs);
1407 free(g_reads);
1409 free(tmpbase);
1410 free(sbase);
1411 free(threads);
1412 free(signal);
1413 free(endInf);
1415 //fclose(fci);
1416 fclose(fcs);
1417 //fclose(fcq);
1418 //fclose(fcm);
1419 fclose(logs);
1422 void contig_assembly(char* prefix, int method)
1424 char name[255];
1425 sprintf(name, "%s.num", prefix);
1427 pair_num =0;
1428 int i;
1429 for(i=0; i< bwtCount; i++)
1431 if(FLAGS[i] == 0)
1432 continue;
1433 pair_num += idx2BWT[i]->numReads;
1435 unsigned long long ctg_num = 0;
1436 fprintf(stderr, "There are totally %d read number for contig assembly\n", pair_num);
1438 layout_contigs_bwt_multi(prefix, &ctg_num, method);
1439 fprintf(stderr, "layout unique contig number: %d\n", ctg_num);
1440 fprintf(stderr, "Layout all contigs finished\n");