modified: makefile
[GalaxyCodeBases.git] / BGI / SOAPdenovo2 / standardPregraph / prlHashReads.c
blob6a9158e21082f8f3c4204953a0ba0521e3556062
1 /*
2 * prlHashReads.c
4 * Copyright (c) 2008-2012 BGI-Shenzhen <soap at genomics dot org dot cn>.
6 * This file is part of SOAPdenovo.
8 * SOAPdenovo is free software: you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation, either version 3 of the License, or
11 * (at your option) any later version.
13 * SOAPdenovo is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with SOAPdenovo. If not, see <http://www.gnu.org/licenses/>.
23 #include "stdinc.h"
24 #include "newhash.h"
25 #include "kmerhash.h"
26 #include "extfunc.h"
27 #include "extvab.h"
29 //debugging variables
30 static long long * tips;
31 static long long * kmerCounter; //kmer number for each KmerSet[thread id]
33 static long long ** kmerFreq;
35 //buffer related varibles for chop kmer
36 static int read_c;
37 static char ** rcSeq; //sequence pointer for each thread
38 static char ** seqBuffer; //read buffer
39 static int * lenBuffer; //read length buffer
40 static int * indexArray; // the read's begin kmer 's kmer_c
42 //buffer related varibles for splay tree
43 static int buffer_size = 100000000; //buffer size for kmerBuffer...
44 static volatile int kmer_c; // kmer count number
45 static Kmer * kmerBuffer; //kmer buffer array
46 static ubyte8 * hashBanBuffer; //the buffered hash value for 'kmerBuffer'
47 static char * nextcBuffer, *prevcBuffer; //next char buffer , previous char buffer for 'kmerBuffer'
49 static struct aiocb aio1;
50 static struct aiocb aio2;
51 static char * aioBuffer1;
52 static char * aioBuffer2;
53 static char * readBuffer1;
54 static char * readBuffer2;
56 static void thread_mark ( KmerSet * set, unsigned char thrdID );
57 static void Mark1in1outNode ( unsigned char * thrdSignal );
58 static void thread_delow ( KmerSet * set, unsigned char thrdID );
59 static void deLowCov ( unsigned char * thrdSignal );
61 static void singleKmer ( int t, KmerSet * kset );
62 static void chopKmer4read ( int t, int threadID );
64 static void freqStat ( char * outfile );
66 static void threadRoutine ( void * para )
68 PARAMETER * prm;
69 int i;
70 unsigned char id;
71 prm = ( PARAMETER * ) para;
72 id = prm->threadID;
74 //printf("%dth thread with threadID %d, hash_table %p\n",id,prm.threadID,prm.hash_table);
75 while ( 1 )
77 if ( * ( prm->selfSignal ) == 1 )
79 for ( i = 0; i < kmer_c; i++ )
81 //if((unsigned char)(magic_seq(hashBanBuffer[i])%thrd_num)!=id)
82 //if((kmerBuffer[i]%thrd_num)!=id)
83 if ( ( hashBanBuffer[i] % thrd_num ) != id )
85 continue;
88 kmerCounter[id + 1]++;
89 singleKmer ( i, KmerSets[id] );
92 * ( prm->selfSignal ) = 0;
94 else if ( * ( prm->selfSignal ) == 2 )
96 for ( i = 0; i < read_c; i++ )
98 if ( i % thrd_num != id )
100 continue;
103 chopKmer4read ( i, id + 1 );
106 * ( prm->selfSignal ) = 0;
108 else if ( * ( prm->selfSignal ) == 3 )
110 * ( prm->selfSignal ) = 0;
111 break;
113 else if ( * ( prm->selfSignal ) == 4 )
115 thread_mark ( KmerSets[id], id );
116 * ( prm->selfSignal ) = 0;
118 else if ( * ( prm->selfSignal ) == 5 )
120 thread_delow ( KmerSets[id], id );
121 * ( prm->selfSignal ) = 0;
124 usleep ( 1 );
128 static void singleKmer ( int t, KmerSet * kset )
130 kmer_t * pos;
131 put_kmerset ( kset, kmerBuffer[t], prevcBuffer[t], nextcBuffer[t], &pos );
134 static void creatThrds ( pthread_t * threads, PARAMETER * paras )
136 unsigned char i;
137 int temp;
139 for ( i = 0; i < thrd_num; i++ )
141 //printf("to create %dth thread\n",(*(char *)&(threadID[i])));
142 if ( ( temp = pthread_create ( &threads[i], NULL, ( void * ) threadRoutine, & ( paras[i] ) ) ) != 0 )
144 fprintf ( stderr, "Create threads failed.\n" );
145 exit ( 1 );
149 fprintf ( stderr, "%d thread(s) initialized.\n", thrd_num );
152 static void thread_wait ( pthread_t * threads )
154 int i;
156 for ( i = 0; i < thrd_num; i++ )
157 if ( threads[i] != 0 )
159 pthread_join ( threads[i], NULL );
163 static void chopKmer4read ( int t, int threadID )
165 char * src_seq = seqBuffer[t];
166 char * bal_seq = rcSeq[threadID];
167 int len_seq = lenBuffer[t];
168 int j, bal_j;
169 ubyte8 hash_ban, bal_hash_ban;
170 Kmer word, bal_word;
171 int index;
172 char InvalidCh = 4;
173 #ifdef MER127
174 word.high1 = word.low1 = word.high2 = word.low2 = 0;
176 for ( index = 0; index < overlaplen; index++ )
178 word = KmerLeftBitMoveBy2 ( word );
179 word.low2 |= src_seq[index];
182 #else
183 word.high = word.low = 0;
185 for ( index = 0; index < overlaplen; index++ )
187 word = KmerLeftBitMoveBy2 ( word );
188 word.low |= src_seq[index];
191 #endif
192 reverseComplementSeq ( src_seq, len_seq, bal_seq );
193 // complementary node
194 bal_word = reverseComplement ( word, overlaplen );
195 bal_j = len_seq - overlaplen;
196 index = indexArray[t];
198 if ( KmerSmaller ( word, bal_word ) )
200 hash_ban = hash_kmer ( word );
201 hashBanBuffer[index] = hash_ban;
202 kmerBuffer[index] = word;
203 prevcBuffer[index] = InvalidCh;
204 nextcBuffer[index++] = src_seq[0 + overlaplen];
206 else
208 bal_hash_ban = hash_kmer ( bal_word );
209 hashBanBuffer[index] = bal_hash_ban;
210 kmerBuffer[index] = bal_word;
211 prevcBuffer[index] = bal_seq[bal_j - 1];
212 nextcBuffer[index++] = InvalidCh;
215 for ( j = 1; j <= len_seq - overlaplen; j++ )
217 word = nextKmer ( word, src_seq[j - 1 + overlaplen] );
218 bal_j = len_seq - j - overlaplen;
219 bal_word = prevKmer ( bal_word, bal_seq[bal_j] );
221 if ( KmerSmaller ( word, bal_word ) )
223 hash_ban = hash_kmer ( word );
224 hashBanBuffer[index] = hash_ban;
225 kmerBuffer[index] = word;
226 prevcBuffer[index] = src_seq[j - 1];
228 if ( j < len_seq - overlaplen )
230 nextcBuffer[index++] = src_seq[j + overlaplen];
232 else
234 nextcBuffer[index++] = InvalidCh;
237 //printf("%dth: %p with %p\n",kmer_c-1,word,hashBanBuffer[kmer_c-1]);
239 else
241 // complementary node
242 bal_hash_ban = hash_kmer ( bal_word );
243 hashBanBuffer[index] = bal_hash_ban;
244 kmerBuffer[index] = bal_word;
246 if ( bal_j > 0 )
248 prevcBuffer[index] = bal_seq[bal_j - 1];
250 else
252 prevcBuffer[index] = InvalidCh;
255 nextcBuffer[index++] = bal_seq[bal_j + overlaplen];
256 //printf("%dth: %p with %p\n",kmer_c-1,bal_word,hashBanBuffer[kmer_c-1]);
261 static void sendWorkSignal ( unsigned char SIG, unsigned char * thrdSignals )
263 int t;
265 for ( t = 0; t < thrd_num; t++ )
267 thrdSignals[t + 1] = SIG;
270 while ( 1 )
272 usleep ( 10 );
274 for ( t = 0; t < thrd_num; t++ )
275 if ( thrdSignals[t + 1] )
277 break;
280 if ( t == thrd_num )
282 break;
287 /*************************************************
288 Function:
289 prlRead2HashTable
290 Description:
291 1. Imports the reads from the lib file one by one.
292 2. Chops the reads into kmers and store them in KmerSets.
293 3. Removes the kmers with low coverage.
294 4. Marks the linear kmers.
295 5. Counts the kmer frequences.
296 Input:
297 1. libfile : the reads config file
298 2. outfile : the output file prefix
299 Output:
300 None.
301 Return:
302 1 if exits normally.
303 *************************************************/
304 boolean prlRead2HashTable ( char * libfile, char * outfile )
306 char * cach1;
307 char * cach2;
308 unsigned char asm_ctg = 1;
309 long long i;
310 char * next_name, name[256];
311 FILE * fo;
312 time_t start_t, stop_t;
313 int maxReadNum;
314 int libNo;
315 pthread_t threads[thrd_num];
316 unsigned char thrdSignal[thrd_num + 1];
317 PARAMETER paras[thrd_num];
318 boolean flag, pairs = 0;
319 WORDFILTER = createFilter ( overlaplen );
320 maxReadLen = 0;
321 maxNameLen = 256;
322 scan_libInfo ( libfile );
323 alloc_pe_mem ( num_libs );
325 if ( !maxReadLen )
327 maxReadLen = 100;
330 if ( gLineLen < maxReadLen )
332 gStr = ( char * ) ckalloc ( ( maxReadLen + 1 ) * sizeof ( char ) );
335 //init
336 maxReadLen4all = maxReadLen;
337 fprintf ( stderr, "In %s, %d lib(s), maximum read length %d, maximum name length %d.\n\n", libfile, num_libs, maxReadLen, maxNameLen );
338 next_name = ( char * ) ckalloc ( ( maxNameLen + 1 ) * sizeof ( char ) );
339 kmerBuffer = ( Kmer * ) ckalloc ( buffer_size * sizeof ( Kmer ) );
340 hashBanBuffer = ( ubyte8 * ) ckalloc ( buffer_size * sizeof ( ubyte8 ) );
341 prevcBuffer = ( char * ) ckalloc ( buffer_size * sizeof ( char ) );
342 nextcBuffer = ( char * ) ckalloc ( buffer_size * sizeof ( char ) );
343 maxReadNum = buffer_size / ( maxReadLen - overlaplen + 1 );
344 //printf("buffer size %d, max read len %d, max read num %d\n",buffer_size,maxReadLen,maxReadNum);
345 int maxAIOSize = 32768;
346 aioBuffer1 = ( char * ) ckalloc ( ( maxAIOSize ) * sizeof ( char ) );
347 aioBuffer2 = ( char * ) ckalloc ( ( maxAIOSize ) * sizeof ( char ) );
348 readBuffer1 = ( char * ) ckalloc ( ( maxAIOSize + ( maxReadLen * 4 + 1024 ) ) * sizeof ( char ) ); //(char *)ckalloc(maxAIOSize*sizeof(char)); //1024
349 readBuffer2 = ( char * ) ckalloc ( ( maxAIOSize + ( maxReadLen * 4 + 1024 ) ) * sizeof ( char ) ); //1024
350 cach1 = ( char * ) ckalloc ( ( maxReadLen * 4 + 1024 ) * sizeof ( char ) ); //1024
351 cach2 = ( char * ) ckalloc ( ( maxReadLen * 4 + 1024 ) * sizeof ( char ) ); //1024
352 memset ( cach1, '\0', ( maxReadLen * 4 + 1024 ) ); //1024
353 memset ( cach2, '\0', ( maxReadLen * 4 + 1024 ) ); //1024
354 seqBuffer = ( char ** ) ckalloc ( maxReadNum * sizeof ( char * ) );
355 lenBuffer = ( int * ) ckalloc ( maxReadNum * sizeof ( int ) );
356 indexArray = ( int * ) ckalloc ( maxReadNum * sizeof ( int ) );
358 for ( i = 0; i < maxReadNum; i++ )
360 seqBuffer[i] = ( char * ) ckalloc ( maxReadLen * sizeof ( char ) );
363 rcSeq = ( char ** ) ckalloc ( ( thrd_num + 1 ) * sizeof ( char * ) );
365 if ( 1 )
367 kmerCounter = ( long long * ) ckalloc ( ( thrd_num + 1 ) * sizeof ( long long ) );
368 KmerSets = ( KmerSet ** ) ckalloc ( thrd_num * sizeof ( KmerSet * ) );
369 ubyte8 init_size = 1024;
370 ubyte8 k = 0;
372 if ( initKmerSetSize )
374 #ifdef MER127
375 init_size = ( ubyte8 ) ( ( double ) initKmerSetSize * 1024.0f * 1024.0f * 1024.0f / ( double ) thrd_num / 40 );
376 #else
377 init_size = ( ubyte8 ) ( ( double ) initKmerSetSize * 1024.0f * 1024.0f * 1024.0f / ( double ) thrd_num / 24 ); //is it true?
378 #endif
382 ++k;
384 while ( k * 0xFFFFFFLLU < init_size );
387 for ( i = 0; i < thrd_num; i++ )
389 //KmerSets[i] = init_kmerset(1024,0.77f);
390 KmerSets[i] = init_kmerset ( ( ( initKmerSetSize ) ? ( k * 0xFFFFFFLLU ) : ( init_size ) ), 0.77f );
391 thrdSignal[i + 1] = 0;
392 paras[i].threadID = i;
393 paras[i].mainSignal = &thrdSignal[0];
394 paras[i].selfSignal = &thrdSignal[i + 1];
395 kmerCounter[i + 1] = 0;
396 rcSeq[i + 1] = ( char * ) ckalloc ( maxReadLen * sizeof ( char ) );
399 creatThrds ( threads, paras );
402 thrdSignal[0] = kmerCounter[0] = 0;
403 time ( &start_t );
404 kmer_c = n_solexa = read_c = i = libNo = readNumBack = gradsCounter = 0;
406 while ( openNextFile ( &libNo, pairs, asm_ctg ) )
408 //read bam file
409 if ( lib_array[libNo].curr_type == 4 )
411 int type = 0; //deside the PE reads is good or bad
413 while ( ( flag = read1seqInLibBam ( seqBuffer[read_c], next_name, & ( lenBuffer[read_c] ), &libNo, pairs, 1, &type ) ) != 0 )
415 if ( type == -1 ) //if the reads is bad, go back.
417 i--;
419 if ( lenBuffer[read_c - 1] >= overlaplen + 1 )
421 kmer_c -= lenBuffer[read_c - 1] - overlaplen + 1;
422 read_c--;
425 n_solexa -= 2;
426 continue;
429 if ( ( ++i ) % 100000000 == 0 )
430 { fprintf ( stderr, "--- %lldth reads.\n", i ); }
432 if ( lenBuffer[read_c] < 0 )
433 { fprintf ( stderr, "Read len %d.\n", lenBuffer[read_c] ); }
435 if ( lenBuffer[read_c] < overlaplen + 1 )
436 { continue; }
439 if(lenBuffer[read_c]>70)
440 lenBuffer[read_c] = 50;
441 else if(lenBuffer[read_c]>40)
442 lenBuffer[read_c] = 40;
444 indexArray[read_c] = kmer_c;
445 kmer_c += lenBuffer[read_c] - overlaplen + 1;
446 read_c++;
448 if ( read_c == maxReadNum )
450 kmerCounter[0] += kmer_c;
451 sendWorkSignal ( 2, thrdSignal ); //chopKmer4read
452 sendWorkSignal ( 1, thrdSignal ); //singleKmer
453 kmer_c = read_c = 0;
457 //read PE fasta or fastq
458 else if ( lib_array[libNo].curr_type == 1 || lib_array[libNo].curr_type == 2 )
460 initAIO ( &aio1, aioBuffer1, fileno ( lib_array[libNo].fp1 ), maxAIOSize );
461 initAIO ( &aio2, aioBuffer2, fileno ( lib_array[libNo].fp2 ), maxAIOSize );
462 int offset1, offset2, flag1, flag2, rt1, rt2;
463 offset1 = offset2 = 0;
464 rt1 = aio_read ( &aio1 );
465 rt2 = aio_read ( &aio2 );
466 flag1 = AIORead ( &aio1, &offset1, readBuffer1, cach1, &rt1, lib_array[libNo].curr_type );
467 flag2 = AIORead ( &aio2, &offset2, readBuffer2, cach2, &rt2, lib_array[libNo].curr_type );
469 if ( flag1 && flag2 )
471 int start1, start2, turn;
472 start1 = start2 = 0;
473 turn = 1;
475 while ( start1 < offset1 || start2 < offset2 )
477 if ( turn == 1 )
479 turn = 2;
480 readseqInLib ( seqBuffer[read_c], next_name, & ( lenBuffer[read_c] ), readBuffer1, &start1, offset1, libNo );
482 if ( ( ++i ) % 100000000 == 0 )
483 { fprintf ( stderr, "--- %lldth reads.\n", i ); }
485 if ( lenBuffer[read_c] < 0 )
486 { fprintf ( stderr, "Read len %d.\n", lenBuffer[read_c] ); }
488 if ( lenBuffer[read_c] < overlaplen + 1 )
490 if ( start1 >= offset1 )
492 start1 = 0;
493 offset1 = 0;
494 flag1 = AIORead ( &aio1, &offset1, readBuffer1, cach1, &rt1, lib_array[libNo].curr_type );
497 continue;
500 indexArray[read_c] = kmer_c;
501 kmer_c += lenBuffer[read_c] - overlaplen + 1;
502 read_c++;
504 if ( start1 >= offset1 )
506 start1 = 0;
507 offset1 = 0;
508 flag1 = AIORead ( &aio1, &offset1, readBuffer1, cach1, &rt1, lib_array[libNo].curr_type );
511 if ( read_c == maxReadNum )
513 kmerCounter[0] += kmer_c;
514 sendWorkSignal ( 2, thrdSignal ); //chopKmer4read
515 sendWorkSignal ( 1, thrdSignal ); //singleKmer
516 kmer_c = read_c = 0;
519 continue;
522 if ( turn == 2 )
524 turn = 1;
525 readseqInLib ( seqBuffer[read_c], next_name, & ( lenBuffer[read_c] ), readBuffer2, &start2, offset2, libNo );
527 if ( ( ++i ) % 100000000 == 0 )
528 { fprintf ( stderr, "--- %lldth reads.\n", i ); }
530 if ( lenBuffer[read_c] < 0 )
531 { fprintf ( stderr, "Read len %d.\n", lenBuffer[read_c] ); }
533 if ( lenBuffer[read_c] < overlaplen + 1 )
535 if ( ( flag2 == 2 ) && ( start2 >= offset2 ) )
536 { break; }
538 if ( start2 >= offset2 )
540 start2 = 0;
541 offset2 = 0;
542 flag2 = AIORead ( &aio2, &offset2, readBuffer2, cach2, &rt2, lib_array[libNo].curr_type );
545 continue;
548 indexArray[read_c] = kmer_c;
549 kmer_c += lenBuffer[read_c] - overlaplen + 1;
550 read_c++;
552 if ( ( flag2 == 2 ) && ( start2 >= offset2 ) )
553 { break; }
555 if ( start2 >= offset2 )
557 start2 = 0;
558 offset2 = 0;
559 flag2 = AIORead ( &aio2, &offset2, readBuffer2, cach2, &rt2, lib_array[libNo].curr_type );
562 if ( read_c == maxReadNum )
564 kmerCounter[0] += kmer_c;
565 sendWorkSignal ( 2, thrdSignal ); //chopKmer4read
566 sendWorkSignal ( 1, thrdSignal ); //singleKmer
567 kmer_c = read_c = 0;
570 continue;
574 else
576 fprintf(stderr, "Error: aio_read error.\n");
579 //read single fasta, single fastq and PE fasta in one file
580 else
582 initAIO ( &aio1, aioBuffer1, fileno ( lib_array[libNo].fp1 ), maxAIOSize );
583 int offset, flag1, rt;
584 offset = 0;
585 rt = aio_read ( &aio1 );
587 while ( ( flag1 = AIORead ( &aio1, &offset, readBuffer1, cach1, &rt, lib_array[libNo].curr_type ) ) )
589 int start = 0;
591 while ( start < offset )
593 readseqInLib ( seqBuffer[read_c], next_name, & ( lenBuffer[read_c] ), readBuffer1, &start, offset, libNo );
595 if ( ( ++i ) % 100000000 == 0 )
596 { fprintf ( stderr, "--- %lldth reads.\n", i ); }
598 if ( lenBuffer[read_c] < 0 )
599 { fprintf ( stderr, "Read len %d.\n", lenBuffer[read_c] ); }
601 if ( lenBuffer[read_c] < overlaplen + 1 )
602 { continue; }
604 indexArray[read_c] = kmer_c;
605 kmer_c += lenBuffer[read_c] - overlaplen + 1;
606 read_c++;
609 if ( read_c > maxReadNum - 1024 )
611 kmerCounter[0] += kmer_c;
612 sendWorkSignal ( 2, thrdSignal ); //chopKmer4read
613 sendWorkSignal ( 1, thrdSignal ); //singleKmer
614 kmer_c = read_c = 0;
617 if ( flag1 == 2 )
618 { break; }
623 if ( read_c )
625 kmerCounter[0] += kmer_c;
626 sendWorkSignal ( 2, thrdSignal ); //chopKmer4read
627 sendWorkSignal ( 1, thrdSignal ); //singleKmer
630 time ( &stop_t );
631 fprintf ( stderr, "Time spent on hashing reads: %ds, %lld read(s) processed.\n", ( int ) ( stop_t - start_t ), i );
633 //record insert size info
634 if ( pairs )
636 if ( gradsCounter )
637 { fprintf ( stderr, "%d pe insert size, the largest boundary is %lld.\n\n", gradsCounter, pes[gradsCounter - 1].PE_bound ); }
638 else
640 fprintf ( stderr, "No paired reads found.\n" );
643 sprintf ( name, "%s.peGrads", outfile );
644 fo = ckopen ( name, "w" );
645 fprintf ( fo, "grads&num: %d\t%lld\n", gradsCounter, n_solexa );
647 for ( i = 0; i < gradsCounter; i++ )
649 fprintf ( fo, "%d\t%lld\t%d\n", pes[i].insertS, pes[i].PE_bound, pes[i].rank );
652 fclose ( fo );
655 free_pe_mem ();
656 free_libs ();
658 if ( 1 )
660 unsigned long long alloCounter = 0;
661 unsigned long long allKmerCounter = 0;
663 for ( i = 0; i < thrd_num; i++ )
665 alloCounter += count_kmerset ( ( KmerSets[i] ) );
666 allKmerCounter += kmerCounter[i + 1];
667 free ( ( void * ) rcSeq[i + 1] );
670 fprintf ( stderr, "%lli node(s) allocated, %lli kmer(s) in reads, %lli kmer(s) processed.\n", alloCounter, kmerCounter[0], allKmerCounter );
673 free ( ( void * ) rcSeq );
674 free ( ( void * ) kmerCounter );
676 for ( i = 0; i < maxReadNum; i++ )
678 free ( ( void * ) seqBuffer[i] );
681 free ( ( void * ) seqBuffer );
682 free ( ( void * ) lenBuffer );
683 free ( ( void * ) indexArray );
684 free ( ( void * ) kmerBuffer );
685 free ( ( void * ) hashBanBuffer );
686 free ( ( void * ) nextcBuffer );
687 free ( ( void * ) prevcBuffer );
688 free ( ( void * ) next_name );
689 free ( ( void * ) aioBuffer1 );
690 free ( ( void * ) aioBuffer2 );
691 free ( ( void * ) readBuffer1 );
692 free ( ( void * ) readBuffer2 );
693 free ( ( void * ) cach1 );
694 free ( ( void * ) cach2 );
695 fprintf ( stderr, "done hashing nodes\n" );
697 if ( deLowKmer )
699 time ( &start_t );
700 deLowCov ( thrdSignal );
701 time ( &stop_t );
702 fprintf ( stderr, "Time spent on delowcvgNode: %ds.\n", ( int ) ( stop_t - start_t ) );
705 time ( &start_t );
706 Mark1in1outNode ( thrdSignal );
707 freqStat ( outfile );
708 time ( &stop_t );
709 fprintf ( stderr, "Time spent on marking linear nodes: %ds.\n", ( int ) ( stop_t - start_t ) );
710 sendWorkSignal ( 3, thrdSignal ); //exit
711 thread_wait ( threads );
712 return 1;
715 void initAIO ( struct aiocb * aio, char * buf, int fd, int size )
717 bzero ( aio, sizeof ( struct aiocb ) );
718 aio->aio_buf = ( void * ) buf;
719 aio->aio_fildes = fd;
720 aio->aio_nbytes = size;
721 aio->aio_offset = 0;
724 int AIORead ( struct aiocb * mycb, int * offset, char * buf, char * cach, int * rt, int curr_type )
726 int i, i2, i3, j;
727 int num;
728 size_t mode, get, max_list;
730 // rt = aio_read(mycb);
731 if ( *rt == 0 )
733 struct aiocb * aiocb_list[1];
734 aiocb_list[0] = mycb;
735 max_list = 1;
737 while ( 1 )
739 mode = aio_suspend ( ( const struct aiocb * const * ) aiocb_list, max_list, NULL );
741 if ( mode == -1 )
743 if ( errno != EAGAIN && errno != EINTR )
745 fprintf ( stderr, "Error:%s.\n", errno );
746 return 0;
748 else
749 { continue; }
751 else
753 //while(aio_error(mycb) == EINPROGRESS);
754 get = aio_return ( mycb );
755 j = strlen ( cach );
757 if ( get > 0 )
759 char * temp = ( char * ) ( ( *mycb ).aio_buf );
761 if ( ( get % 32768 ) != 0 )
763 strcpy ( buf, cach );
764 memcpy ( &buf[j], temp, get );
765 memset ( cach, '\0', j );
766 //printf("%s",buf);
767 *offset = j + get;
768 return 2;
771 if ( ( curr_type == 2 ) || ( curr_type == 6 ) )
773 num = 0;
775 for ( i = get - 1; ( temp[i] != '@' ) || ( temp[i - 1] != '\n' ); i-- )
777 if ( temp[i] == '\n' ) {num++;}
780 if ( num <= 1 )
782 for ( i2 = i - 2; temp[i2] != '\n'; i2-- ) { ; }
784 if ( temp[i2 + 1] == '+' )
786 for ( i2 = i2 - 1; temp[i2] != '\n'; i2-- ) { ; }
788 if ( temp[i2 + 1] != '+' ) {for ( i = i2 - 1; ( temp[i] != '@' ) || ( temp[i - 1] != '\n' ); i-- ) { ; }}
792 else if ( ( curr_type == 1 ) || ( curr_type == 3 ) || ( curr_type == 5 ) )
793 for ( i = get - 1; temp[i] != '>'; i-- ) { ; }
795 //for (i = get - 1; temp[i] != '>' && temp[i] != '@'; i--) ;
796 strcpy ( buf, cach );
797 memcpy ( &buf[j], temp, i );
798 //printf("%s",buf);
799 *offset = i + j;
800 memset ( cach, '\0', j );
801 memcpy ( cach, &temp[i], get - i );
802 ( *mycb ).aio_offset += get;
803 *rt = aio_read ( mycb );
804 return 1;
806 else
808 fprintf(stderr, "Error: aio_return error.\n");
810 /*else
812 char *temp = (char *)((*mycb).aio_buf);
813 strcpy(buf,cach);
814 strcpy(&buf[j],temp);
815 *offset = j + get;
816 return 2;
817 } */
821 else
823 fprintf(stderr, "Error: (*rt != 0) in AIORead.\n");
826 return 0;
829 boolean openNextFile ( int * libNo, boolean pairs, unsigned char asm_ctg )
831 int i = *libNo;
832 int prevLib = i;
834 if ( lib_array[i].fp1 )
835 { closeFp1InLab ( i ); }
837 if ( lib_array[i].fp2 )
838 { closeFp2InLab ( i ); }
840 *libNo = nextValidIndex ( i, pairs, asm_ctg );
841 i = *libNo;
843 if ( lib_array[i].rd_len_cutoff > 0 )
844 { maxReadLen = lib_array[i].rd_len_cutoff < maxReadLen4all ? lib_array[i].rd_len_cutoff : maxReadLen4all; }
845 else
846 { maxReadLen = maxReadLen4all; }
848 //record insert size info
849 //printf("from lib %d to %d, read %lld to %ld\n",prevLib,i,readNumBack,n_solexa);
850 if ( pairs && i != prevLib )
852 if ( readNumBack < n_solexa )
854 pes[gradsCounter].PE_bound = n_solexa;
855 pes[gradsCounter].rank = lib_array[prevLib].rank;
856 pes[gradsCounter].pair_num_cut = lib_array[prevLib].pair_num_cut;
857 pes[gradsCounter++].insertS = lib_array[prevLib].avg_ins;
858 readNumBack = n_solexa;
862 if ( i >= num_libs )
863 { return 0; }
865 openFileInLib ( i );
866 return 1;
869 static void thread_delow ( KmerSet * set, unsigned char thrdID )
871 int i, in_num, out_num, cvgSingle;
872 int l_cvg, r_cvg;
873 kmer_t * rs;
874 set->iter_ptr = 0;
876 while ( set->iter_ptr < set->size )
878 if ( !is_kmer_entity_null ( set->flags, set->iter_ptr ) )
880 in_num = out_num = l_cvg = r_cvg = 0;
881 rs = set->array + set->iter_ptr;
883 for ( i = 0; i < 4; i++ )
885 cvgSingle = get_kmer_left_cov ( *rs, i );
887 if ( cvgSingle > 0 && cvgSingle <= deLowKmer )
889 set_kmer_left_cov ( *rs, i, 0 );
892 cvgSingle = get_kmer_right_cov ( *rs, i );
894 if ( cvgSingle > 0 && cvgSingle <= deLowKmer )
896 set_kmer_right_cov ( *rs, i, 0 );
900 if ( rs->l_links == 0 && rs->r_links == 0 )
902 rs->deleted = 1;
903 tips[thrdID]++;
907 set->iter_ptr++;
910 //printf("%lld single nodes, %lld linear\n",counter,tips[thrdID]);
913 static void deLowCov ( unsigned char * thrdSignal )
915 int i;
916 long long counter = 0;
917 tips = ( long long * ) ckalloc ( thrd_num * sizeof ( long long ) );
919 for ( i = 0; i < thrd_num; i++ )
921 tips[i] = 0;
924 sendWorkSignal ( 5, thrdSignal ); //mark linear nodes
926 for ( i = 0; i < thrd_num; i++ )
928 counter += tips[i];
931 free ( ( void * ) tips );
932 fprintf ( stderr, "%lld kmer(s) removed.\n", counter );
935 static void thread_mark ( KmerSet * set, unsigned char thrdID )
937 int i, in_num, out_num, cvgSingle;
938 int l_cvg, r_cvg;
939 kmer_t * rs;
940 long long counter = 0;
941 set->iter_ptr = 0;
943 while ( set->iter_ptr < set->size )
945 if ( !is_kmer_entity_null ( set->flags, set->iter_ptr ) )
947 in_num = out_num = l_cvg = r_cvg = 0;
948 rs = set->array + set->iter_ptr;
950 for ( i = 0; i < 4; i++ )
952 cvgSingle = get_kmer_left_cov ( *rs, i );
954 if ( cvgSingle > 0 )
956 in_num++;
957 l_cvg += cvgSingle;
960 cvgSingle = get_kmer_right_cov ( *rs, i );
962 if ( cvgSingle > 0 )
964 out_num++;
965 r_cvg += cvgSingle;
969 if ( rs->single )
971 kmerFreq[thrdID][1]++;
972 counter++;
974 else
976 kmerFreq[thrdID][ ( l_cvg > r_cvg ? l_cvg : r_cvg )]++;
979 if ( in_num == 1 && out_num == 1 )
981 rs->linear = 1;
982 tips[thrdID]++;
986 set->iter_ptr++;
989 //printf("%lld single nodes, %lld linear\n",counter,tips[thrdID]);
992 static void Mark1in1outNode ( unsigned char * thrdSignal )
994 int i;
995 long long counter = 0;
996 tips = ( long long * ) ckalloc ( thrd_num * sizeof ( long long ) );
997 kmerFreq = ( long long ** ) ckalloc ( thrd_num * sizeof ( long long * ) );
999 for ( i = 0; i < thrd_num; i++ )
1001 kmerFreq[i] = ( long long * ) ckalloc ( 257 * sizeof ( long long ) );
1002 memset ( kmerFreq[i], 0, 257 * sizeof ( long long ) );
1003 tips[i] = 0;
1006 sendWorkSignal ( 4, thrdSignal ); //mark linear nodes
1008 for ( i = 0; i < thrd_num; i++ )
1010 counter += tips[i];
1013 free ( ( void * ) tips );
1014 fprintf ( stderr, "%lld linear node(s) marked.\n", counter );
1017 static void freqStat ( char * outfile )
1019 FILE * fo;
1020 char name[256];
1021 int i, j;
1022 long long sum;
1023 sprintf ( name, "%s.kmerFreq", outfile );
1024 fo = ckopen ( name, "w" );
1026 for ( i = 1; i < 256; i++ )
1028 sum = 0;
1030 for ( j = 0; j < thrd_num; j++ )
1032 sum += kmerFreq[j][i];
1035 fprintf ( fo, "%lld\n", sum );
1038 for ( i = 0; i < thrd_num; i++ )
1040 free ( ( void * ) kmerFreq[i] );
1043 free ( ( void * ) kmerFreq );
1044 fclose ( fo );