modified: makefile
[GalaxyCodeBases.git] / BGI / SOAPdenovo2 / standardPregraph / prlRead2path.c
blobf584e2a53150859278bcd233d88f626d639a4308
1 /*
2 * prlRead2path.c
4 * Copyright (c) 2008-2012 BGI-Shenzhen <soap at genomics dot org dot cn>.
6 * This file is part of SOAPdenovo.
8 * SOAPdenovo is free software: you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation, either version 3 of the License, or
11 * (at your option) any later version.
13 * SOAPdenovo is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with SOAPdenovo. If not, see <http://www.gnu.org/licenses/>.
23 #include <stdinc.h>
24 #include "newhash.h"
25 #include "kmerhash.h"
26 #include <extfunc.h>
27 #include <extvab.h>
29 #define preARCBLOCKSIZE 100000
30 #ifdef MER127
31 static const Kmer kmerZero = { 0, 0, 0, 0 };
32 #else
33 static const Kmer kmerZero = { 0, 0 };
34 #endif
36 static unsigned int * arcCounters;
37 static int buffer_size = 100000000;
38 static long long markCounter = 0;
39 static unsigned int * fwriteBuf;
40 static unsigned char * markerOnEdge; //edge occured times for each edge
42 //buffer related varibles for chop kmer
43 static int read_c;
44 static char ** rcSeq;
45 static char ** seqBuffer;
46 static int * lenBuffer;
48 //edge and (K+1)mer related variables
49 static preARC ** preArc_array;
50 static Kmer * mixBuffer; //kmer buffer; after searching, mixBuffer[j].low = node->l_links; 'node->l_links'
51 static boolean * flagArray; //indicate each item in mixBuffer whether it's a (K+1)mer
53 // kmer related variables
54 static char ** flags;
55 static int kmer_c;
56 static Kmer * kmerBuffer;
57 static ubyte8 * hashBanBuffer;
58 static kmer_t ** nodeBuffer; //kmer_t buffer related to 'kmerBuffer'
59 static boolean * smallerBuffer;
60 static int * indexArray;
62 static int * deletion; //read deletion number for each thread
64 static struct aiocb aio1;
65 static struct aiocb aio2;
66 static char * aioBuffer1;
67 static char * aioBuffer2;
68 static char * readBuffer1;
69 static char * readBuffer2;
71 static void parse1read ( int t, int threadID );
72 static void search1kmerPlus ( int j, unsigned char thrdID );
73 static void threadRoutine ( void * thrdID );
74 static void searchKmer ( int t, KmerSet * kset );
75 static void chopKmer4read ( int t, int threadID );
76 static void thread_wait ( pthread_t * threads );
77 static void thread_add1preArc ( unsigned int from_ed, unsigned int to_ed, unsigned int thrdID );
79 static void creatThrds ( pthread_t * threads, PARAMETER * paras )
81 unsigned char i;
82 int temp;
84 for ( i = 0; i < thrd_num; i++ )
86 //printf("to create %dth thread\n",(*(char *)&(threadID[i])));
87 if ( ( temp = pthread_create ( &threads[i], NULL, ( void * ) threadRoutine, & ( paras[i] ) ) ) != 0 )
89 fprintf ( stderr, "Create threads failed.\n" );
90 exit ( 1 );
94 fprintf ( stderr, "%d thread(s) initialized.\n", thrd_num );
97 static void threadRoutine ( void * para )
99 PARAMETER * prm;
100 int i, t, j, start, finish;
101 unsigned char id;
102 prm = ( PARAMETER * ) para;
103 id = prm->threadID;
105 //printf("%dth thread with task %d, hash_table %p\n",id,prm.task,prm.hash_table);
106 while ( 1 )
108 if ( * ( prm->selfSignal ) == 1 )
110 for ( i = 0; i < kmer_c; i++ )
112 //if((hashBanBuffer[i]&taskMask)!=prm.threadID)
113 if ( ( hashBanBuffer[i] % thrd_num ) != id )
115 continue;
118 searchKmer ( i, KmerSets[id] );
121 * ( prm->selfSignal ) = 0;
123 else if ( * ( prm->selfSignal ) == 2 )
125 for ( i = 0; i < read_c; i++ )
127 if ( i % thrd_num != id )
129 continue;
132 chopKmer4read ( i, id + 1 );
135 * ( prm->selfSignal ) = 0;
137 else if ( * ( prm->selfSignal ) == 3 )
139 // parse reads
140 for ( t = 0; t < read_c; t++ )
142 if ( t % thrd_num != id )
144 continue;
147 parse1read ( t, id + 1 );
150 * ( prm->selfSignal ) = 0;
152 else if ( * ( prm->selfSignal ) == 4 )
154 //printf("thread %d, reads %d splay kmerplus\n",id,read_c);
155 for ( t = 0; t < read_c; t++ )
157 start = indexArray[t];
158 finish = indexArray[t + 1];
159 #ifdef MER127
161 for ( j = start; j < finish; j++ )
163 if ( flagArray[j] == 0 )
165 if ( mixBuffer[j].low2 == 0 )
167 break;
170 else if ( hashBanBuffer[j] % thrd_num == id )
172 //fprintf(stderr,"thread %d search for ban %lld\n",id,hashBanBuffer[j]);
173 search1kmerPlus ( j, id );
177 if(flagArray[j]==0&&mixBuffer[j]==0)
178 break;
179 if(!flagArray[j]||(hashBanBuffer[j]%thrd_num)!=id)
180 continue;
181 search1kmerPlus(j,id);
185 #else
187 for ( j = start; j < finish; j++ )
189 if ( flagArray[j] == 0 )
191 if ( mixBuffer[j].low == 0 )
193 break;
196 else if ( hashBanBuffer[j] % thrd_num == id )
198 //fprintf(stderr,"thread %d search for ban %lld\n",id,hashBanBuffer[j]);
199 search1kmerPlus ( j, id );
203 if(flagArray[j]==0&&mixBuffer[j]==0)
204 break;
205 if(!flagArray[j]||(hashBanBuffer[j]%thrd_num)!=id)
206 continue;
207 search1kmerPlus(j,id);
211 #endif
214 * ( prm->selfSignal ) = 0;
216 else if ( * ( prm->selfSignal ) == 6 )
218 for ( t = 0; t < read_c; t++ )
220 start = indexArray[t];
221 finish = indexArray[t + 1];
222 #ifdef MER127
224 for ( j = start; j < finish - 1; j++ )
226 if ( mixBuffer[j].low2 == 0 || mixBuffer[j + 1].low2 == 0 )
228 break;
231 if ( mixBuffer[j].low2 % thrd_num != id )
233 continue;
236 thread_add1preArc ( mixBuffer[j].low2, mixBuffer[j + 1].low2, id );
239 #else
241 for ( j = start; j < finish - 1; j++ )
243 if ( mixBuffer[j].low == 0 || mixBuffer[j + 1].low == 0 )
245 break;
248 if ( mixBuffer[j].low % thrd_num != id )
250 continue;
253 thread_add1preArc ( mixBuffer[j].low, mixBuffer[j + 1].low, id );
256 #endif
259 * ( prm->selfSignal ) = 0;
261 else if ( * ( prm->selfSignal ) == 5 )
263 * ( prm->selfSignal ) = 0;
264 break;
267 usleep ( 1 );
271 static void chopKmer4read ( int t, int threadID )
273 char * src_seq = seqBuffer[t];
274 char * bal_seq = rcSeq[threadID];
275 int len_seq = lenBuffer[t];
276 int j, bal_j;
277 ubyte8 hash_ban, bal_hash_ban;
278 Kmer word, bal_word;
279 int index;
280 #ifdef MER127
281 word = kmerZero;
283 for ( index = 0; index < overlaplen; index++ )
285 word = KmerLeftBitMoveBy2 ( word );
286 word.low2 |= src_seq[index];
289 #else
290 word = kmerZero;
292 for ( index = 0; index < overlaplen; index++ )
294 word = KmerLeftBitMoveBy2 ( word );
295 word.low |= src_seq[index];
298 #endif
299 reverseComplementSeq ( src_seq, len_seq, bal_seq );
300 // complementary node
301 bal_word = reverseComplement ( word, overlaplen );
302 bal_j = len_seq - 0 - overlaplen; // 0;
303 index = indexArray[t];
305 if ( KmerSmaller ( word, bal_word ) )
307 hash_ban = hash_kmer ( word );
308 kmerBuffer[index] = word;
309 smallerBuffer[index] = 1;
310 hashBanBuffer[index++] = hash_ban;
312 else
314 bal_hash_ban = hash_kmer ( bal_word );
315 kmerBuffer[index] = bal_word;
316 smallerBuffer[index] = 0;
317 hashBanBuffer[index++] = bal_hash_ban;
320 //printf("%dth: %p with %p\n",kmer_c-1,bal_word,bal_hash_ban);
321 for ( j = 1; j <= len_seq - overlaplen; j++ )
323 word = nextKmer ( word, src_seq[j - 1 + overlaplen] );
324 bal_j = len_seq - j - overlaplen; // j;
325 bal_word = prevKmer ( bal_word, bal_seq[bal_j] );
327 if ( KmerSmaller ( word, bal_word ) )
329 hash_ban = hash_kmer ( word );
330 kmerBuffer[index] = word;
331 smallerBuffer[index] = 1;
332 hashBanBuffer[index++] = hash_ban;
333 //printf("%dth: %p with %p\n",kmer_c-1,word,hashBanBuffer[kmer_c-1]);
335 else
337 // complementary node
338 bal_hash_ban = hash_kmer ( bal_word );
339 kmerBuffer[index] = bal_word;
340 smallerBuffer[index] = 0;
341 hashBanBuffer[index++] = bal_hash_ban;
342 //printf("%dth: %p with %p\n",kmer_c-1,bal_word,hashBanBuffer[kmer_c-1]);
347 //splay for one kmer in buffer and save the node to nodeBuffer
348 static void searchKmer ( int t, KmerSet * kset )
350 kmer_t * node;
351 boolean found = search_kmerset ( kset, kmerBuffer[t], &node );
353 if ( !found )
355 fprintf ( stderr, "SearchKmer: kmer " );
356 PrintKmer ( stderr, kmerBuffer[t] );
357 fprintf ( stderr, " is not found.\n" );
359 #ifdef MER127
360 fprintf (stderr,"searchKmer: kmer %llx %llx %llx %llx is not found\n", kmerBuffer[t].high1, kmerBuffer[t].low1, kmerBuffer[t].high2, kmerBuffer[t].low2);
361 #else
362 fprintf (stderr,"searchKmer: kmer %llx %llx is not found\n", kmerBuffer[t].high, kmerBuffer[t].low);
363 #endif
367 nodeBuffer[t] = node;
370 static preARC * getPreArcBetween ( unsigned int from_ed, unsigned int to_ed )
372 preARC * parc;
373 parc = preArc_array[from_ed];
375 while ( parc )
377 if ( parc->to_ed == to_ed )
379 return parc;
382 parc = parc->next;
385 return parc;
388 static void thread_add1preArc ( unsigned int from_ed, unsigned int to_ed, unsigned int thrdID )
390 preARC * parc = getPreArcBetween ( from_ed, to_ed );
392 if ( parc )
394 parc->multiplicity++;
396 else
398 parc = prlAllocatePreArc ( to_ed, preArc_mem_managers[thrdID] );
399 arcCounters[thrdID]++;
400 parc->next = preArc_array[from_ed];
401 preArc_array[from_ed] = parc;
405 static void memoAlloc4preArc ()
407 unsigned int i;
408 preArc_array = ( preARC ** ) ckalloc ( ( num_ed + 1 ) * sizeof ( preARC * ) );
410 for ( i = 0; i <= num_ed; i++ )
412 preArc_array[i] = NULL;
416 static void memoFree4preArc ()
418 prlDestroyPreArcMem ();
420 if ( preArc_array )
422 free ( ( void * ) preArc_array );
426 static void output_arcs ( char * outfile )
428 unsigned int i;
429 char name[256];
430 FILE * outfp, *outfp2 = NULL;
431 preARC * parc;
432 sprintf ( name, "%s.preArc", outfile );
433 outfp = ckopen ( name, "w" );
435 if ( repsTie )
437 sprintf ( name, "%s.markOnEdge", outfile );
438 outfp2 = ckopen ( name, "w" );
441 markCounter = 0;
443 for ( i = 1; i <= num_ed; i++ )
445 if ( repsTie )
447 markCounter += markerOnEdge[i];
448 fprintf ( outfp2, "%d\n", markerOnEdge[i] );
451 parc = preArc_array[i];
453 if ( !parc )
455 continue;
458 fprintf ( outfp, "%u", i );
460 while ( parc )
462 fprintf ( outfp, " %u %u", parc->to_ed, parc->multiplicity );
463 parc = parc->next;
466 fprintf ( outfp, "\n" );
469 fclose ( outfp );
471 if ( repsTie )
473 fclose ( outfp2 );
474 // fprintf (stderr,"%lld marker(s) counted.\n", markCounter);
478 static void recordPathBin ( FILE * outfp )
480 int t, j, start, finish;
481 unsigned char counter;
483 for ( t = 0; t < read_c; t++ )
485 start = indexArray[t];
486 finish = indexArray[t + 1];
487 #ifdef MER127
489 if ( finish - start < 3 || mixBuffer[start].low2 == 0 || mixBuffer[start + 1].low2 == 0 || mixBuffer[start + 2].low2 == 0 )
491 continue;
494 counter = 0;
496 for ( j = start; j < finish; j++ )
498 if ( mixBuffer[j].low2 == 0 )
500 break;
503 fwriteBuf[counter++] = ( unsigned int ) mixBuffer[j].low2;
505 if ( markerOnEdge[mixBuffer[j].low2] < 255 )
507 markerOnEdge[mixBuffer[j].low2]++;
510 markCounter++;
513 #else
515 if ( finish - start < 3 || mixBuffer[start].low == 0 || mixBuffer[start + 1].low == 0 || mixBuffer[start + 2].low == 0 )
517 continue;
520 counter = 0;
522 for ( j = start; j < finish; j++ )
524 if ( mixBuffer[j].low == 0 )
526 break;
529 fwriteBuf[counter++] = ( unsigned int ) mixBuffer[j].low;
531 if ( markerOnEdge[mixBuffer[j].low] < 255 )
533 markerOnEdge[mixBuffer[j].low]++;
536 markCounter++;
539 #endif
540 fwrite ( &counter, sizeof ( char ), 1, outfp );
541 fwrite ( fwriteBuf, sizeof ( unsigned int ), ( int ) counter, outfp );
545 /*************************************************
546 Function:
547 search1kmerPlus
548 Description:
549 Searchs (k+1) kmer in hashset.
550 Input:
551 1. j: (k+1) mer's index
552 2. thrdID: thread id
553 Output:
554 None.
555 Return:
556 None.
557 *************************************************/
558 static void search1kmerPlus ( int j, unsigned char thrdID )
560 kmer_t * node;
561 boolean found = search_kmerset ( KmerSetsPatch[thrdID], mixBuffer[j], &node );
563 if ( !found )
566 fprintf(stderr,"kmerPlus %llx %llx (hashban %lld) not found, flag %d!\n",
567 mixBuffer[j].high,mixBuffer[j].low,hashBanBuffer[j],flagArray[j]);
569 mixBuffer[j] = kmerZero;
570 return;
571 } //else fprintf(stderr,"kmerPlus found\n");
573 #ifdef MER127
575 if ( smallerBuffer[j] )
577 mixBuffer[j].low2 = node->l_links;
579 else
581 mixBuffer[j].low2 = node->l_links + node->twin - 1;
584 #else
586 if ( smallerBuffer[j] )
588 mixBuffer[j].low = node->l_links;
590 else
592 mixBuffer[j].low = node->l_links + node->twin - 1;
595 #endif
598 static void parse1read ( int t, int threadID )
600 unsigned int j, retain = 0;
601 unsigned int edge_index = 0;
602 kmer_t * node;
603 boolean isSmaller;
604 Kmer wordplus, bal_wordplus;
605 unsigned int start, finish, pos;
606 Kmer prevKmer, currentKmer;
607 boolean IsPrevKmer = 0;
608 start = indexArray[t];
609 finish = indexArray[t + 1];
610 pos = start;
612 for ( j = start; j < finish; j++ )
614 node = nodeBuffer[j];
616 //extract edges or keep kmers
617 if ( ( node->deleted ) || ( node->linear && !node->inEdge ) ) // deleted or in a floating loop
619 if ( retain < 2 )
621 retain = 0;
622 pos = start;
624 else
626 break;
629 continue;
632 isSmaller = smallerBuffer[j];
634 if ( node->linear )
636 if ( isSmaller )
638 edge_index = node->l_links;
640 else
642 edge_index = node->l_links + node->twin - 1;
645 #ifdef MER127
647 if ( retain == 0 || IsPrevKmer )
649 retain++;
650 mixBuffer[pos].low2 = edge_index;
651 flagArray[pos++] = 0;
652 IsPrevKmer = 0;
654 else if ( edge_index != mixBuffer[pos - 1].low2 )
656 retain++;
657 mixBuffer[pos].low2 = edge_index;
658 flagArray[pos++] = 0;
661 #else
663 if ( retain == 0 || IsPrevKmer )
665 retain++;
666 mixBuffer[pos].low = edge_index;
667 flagArray[pos++] = 0;
668 IsPrevKmer = 0;
670 else if ( edge_index != mixBuffer[pos - 1].low )
672 retain++;
673 mixBuffer[pos].low = edge_index;
674 flagArray[pos++] = 0;
677 #endif
679 else
681 if ( isSmaller )
683 currentKmer = node->seq;
685 else
687 currentKmer = reverseComplement ( node->seq, overlaplen );
690 if ( IsPrevKmer )
692 retain++;
693 wordplus = KmerPlus ( prevKmer, lastCharInKmer ( currentKmer ) );
694 bal_wordplus = reverseComplement ( wordplus, overlaplen + 1 );
696 if ( KmerSmaller ( wordplus, bal_wordplus ) )
698 smallerBuffer[pos] = 1;
699 hashBanBuffer[pos] = hash_kmer ( wordplus );
700 mixBuffer[pos] = wordplus;
702 else
704 smallerBuffer[pos] = 0;
705 hashBanBuffer[pos] = hash_kmer ( bal_wordplus );
706 mixBuffer[pos] = bal_wordplus;
709 // fprintf(stderr,"%lld\n",hashBanBuffer[pos]);
710 flagArray[pos++] = 1;
713 IsPrevKmer = 1;
714 prevKmer = currentKmer;
719 for(j=start;j<pos;j++)
720 fprintf(stderr,"%d ",flagArray[j]);
721 fprintf(stderr,"\n");
723 if ( retain < 1 )
725 deletion[threadID]++;
728 if ( retain < 2 )
730 flagArray[start] = 0;
731 mixBuffer[start] = kmerZero;
732 return;
735 if ( ( pos - start ) != retain )
737 fprintf ( stderr, "Read %d, %d vs %d.\n", t, retain, edge_index - start );
740 if ( pos < finish )
742 flagArray[pos] = 0;
743 mixBuffer[pos] = kmerZero;
747 static void sendWorkSignal ( unsigned char SIG, unsigned char * thrdSignals )
749 int t;
751 for ( t = 0; t < thrd_num; t++ )
753 thrdSignals[t + 1] = SIG;
756 while ( 1 )
758 usleep ( 10 );
760 for ( t = 0; t < thrd_num; t++ )
761 if ( thrdSignals[t + 1] )
763 break;
766 if ( t == thrd_num )
768 break;
773 /*************************************************
774 Function:
775 prlRead2edge
776 Description:
777 Maps the reads to edges and builds pre-arcs between edges.
778 Input:
779 1. libfile: the reads config file
780 2. outfile: the output file prefix
781 Output:
782 None.
783 Return:
784 None.
785 *************************************************/
786 void prlRead2edge ( char * libfile, char * outfile )
788 char * cach1;
789 char * cach2;
790 unsigned char asm_ctg = 1;
791 long long i;
792 char name[256], *src_name, *next_name;
793 FILE * outfp = NULL;
794 int maxReadNum, libNo;
795 boolean flag, pairs = 0;
796 pthread_t threads[thrd_num];
797 unsigned char thrdSignal[thrd_num + 1];
798 PARAMETER paras[thrd_num];
799 //init
800 maxReadLen = 0;
801 maxNameLen = 256;
802 scan_libInfo ( libfile );
803 alloc_pe_mem ( num_libs );
805 if ( !maxReadLen )
807 maxReadLen = 100;
810 maxReadLen4all = maxReadLen;
811 fprintf ( stderr, "In file: %s, max seq len %d, max name len %d.\n", libfile, maxReadLen, maxNameLen );
813 if ( repsTie )
815 sprintf ( name, "%s.path", outfile );
816 outfp = ckopen ( name, "wb" );
819 src_name = ( char * ) ckalloc ( ( maxNameLen + 1 ) * sizeof ( char ) );
820 next_name = ( char * ) ckalloc ( ( maxNameLen + 1 ) * sizeof ( char ) );
821 kmerBuffer = ( Kmer * ) ckalloc ( buffer_size * sizeof ( Kmer ) );
822 mixBuffer = ( Kmer * ) ckalloc ( buffer_size * sizeof ( Kmer ) );
823 hashBanBuffer = ( ubyte8 * ) ckalloc ( buffer_size * sizeof ( ubyte8 ) );
824 nodeBuffer = ( kmer_t ** ) ckalloc ( buffer_size * sizeof ( kmer_t * ) );
825 smallerBuffer = ( boolean * ) ckalloc ( buffer_size * sizeof ( boolean ) );
826 flagArray = ( boolean * ) ckalloc ( buffer_size * sizeof ( boolean ) );
827 maxReadNum = buffer_size / ( maxReadLen - overlaplen + 1 );
828 //printf("buffer for at most %d reads\n",maxReadNum);
829 int maxAIOSize = 32768;
830 aioBuffer1 = ( char * ) ckalloc ( ( maxAIOSize ) * sizeof ( char ) );
831 aioBuffer2 = ( char * ) ckalloc ( ( maxAIOSize ) * sizeof ( char ) );
832 readBuffer1 = ( char * ) ckalloc ( ( maxAIOSize + ( maxReadLen * 4 + 1024 ) ) * sizeof ( char ) ); //(char *)ckalloc(maxAIOSize*sizeof(char)); //1024
833 readBuffer2 = ( char * ) ckalloc ( ( maxAIOSize + ( maxReadLen * 4 + 1024 ) ) * sizeof ( char ) ); //1024
834 cach1 = ( char * ) ckalloc ( ( maxReadLen * 4 + 1024 ) * sizeof ( char ) ); //1024
835 cach2 = ( char * ) ckalloc ( ( maxReadLen * 4 + 1024 ) * sizeof ( char ) ); //1024
836 memset ( cach1, '\0', ( maxReadLen * 4 + 1024 ) ); //1024
837 memset ( cach2, '\0', ( maxReadLen * 4 + 1024 ) ); //1024
838 seqBuffer = ( char ** ) ckalloc ( maxReadNum * sizeof ( char * ) );
839 lenBuffer = ( int * ) ckalloc ( maxReadNum * sizeof ( int ) );
840 indexArray = ( int * ) ckalloc ( ( maxReadNum + 1 ) * sizeof ( int ) );
842 for ( i = 0; i < maxReadNum; i++ )
844 seqBuffer[i] = ( char * ) ckalloc ( maxReadLen * sizeof ( char ) );
847 memoAlloc4preArc ();
848 flags = ( char ** ) ckalloc ( ( thrd_num + 1 ) * sizeof ( char * ) );
849 deletion = ( int * ) ckalloc ( ( thrd_num + 1 ) * sizeof ( int ) );
850 rcSeq = ( char ** ) ckalloc ( ( thrd_num + 1 ) * sizeof ( char * ) );
852 if ( repsTie )
854 markerOnEdge = ( unsigned char * ) ckalloc ( ( num_ed + 1 ) * sizeof ( unsigned char ) );
856 for ( i = 1; i <= num_ed; i++ )
858 markerOnEdge[i] = 0;
861 fwriteBuf = ( unsigned int * ) ckalloc ( ( maxReadLen - overlaplen + 1 ) * sizeof ( unsigned int ) );
864 thrdSignal[0] = 0;
866 if ( 1 )
868 preArc_mem_managers = ( MEM_MANAGER ** ) ckalloc ( thrd_num * sizeof ( MEM_MANAGER * ) );
869 arcCounters = ( unsigned int * ) ckalloc ( thrd_num * sizeof ( unsigned int ) );
871 for ( i = 0; i < thrd_num; i++ )
873 arcCounters[i] = 0;
874 preArc_mem_managers[i] = createMem_manager ( preARCBLOCKSIZE, sizeof ( preARC ) );
875 deletion[i + 1] = 0;
876 flags[i + 1] = ( char * ) ckalloc ( 2 * maxReadLen * sizeof ( char ) );
877 rcSeq[i + 1] = ( char * ) ckalloc ( maxReadLen * sizeof ( char ) );
878 thrdSignal[i + 1] = 0;
879 paras[i].threadID = i;
880 paras[i].mainSignal = &thrdSignal[0];
881 paras[i].selfSignal = &thrdSignal[i + 1];
884 creatThrds ( threads, paras );
887 if ( 1 )
889 deletion[0] = 0;
890 flags[0] = ( char * ) ckalloc ( 2 * maxReadLen * sizeof ( char ) );
891 rcSeq[0] = ( char * ) ckalloc ( maxReadLen * sizeof ( char ) );
894 kmer_c = n_solexa = read_c = i = libNo = readNumBack = gradsCounter = 0;
895 int t0, t1, t2, t3, t4, t5, t6;
896 t0 = t1 = t2 = t3 = t4 = t5 = t6 = 0;
897 time_t read_start, read_end, time_bef, time_aft;
898 time ( &read_start );
900 while ( openNextFile ( &libNo, pairs, asm_ctg ) )
902 if ( lib_array[libNo].curr_type == 4 )
904 int type = 0; //deside the PE reads is good or bad
906 while ( ( flag = read1seqInLibBam ( seqBuffer[read_c], next_name, & ( lenBuffer[read_c] ), &libNo, pairs, 1, &type ) ) != 0 )
908 if ( type == -1 ) //if the reads is bad, go back.
910 i--;
912 if ( lenBuffer[read_c - 1] >= overlaplen + 1 )
914 kmer_c -= lenBuffer[read_c - 1] - overlaplen + 1;
915 read_c--;
918 n_solexa -= 2;
919 continue;
922 if ( ( ++i ) % 100000000 == 0 )
924 fprintf ( stderr, "--- %lldth reads.\n", i );
927 if ( lenBuffer[read_c] < overlaplen + 1 )
929 continue;
932 //if(lenBuffer[read_c]>70)
933 // lenBuffer[read_c] = 70;
934 //else if(lenBuffer[read_c]>40)
935 // lenBuffer[read_c] = 40;
936 indexArray[read_c] = kmer_c;
937 kmer_c += lenBuffer[read_c] - overlaplen + 1;
938 read_c++;
940 if ( read_c == maxReadNum )
942 indexArray[read_c] = kmer_c;
943 time ( &read_end );
944 t0 += read_end - read_start;
945 time ( &time_bef );
946 sendWorkSignal ( 2, thrdSignal ); //chopKmer4read
947 time ( &time_aft );
948 t1 += time_aft - time_bef;
949 time ( &time_bef );
950 sendWorkSignal ( 1, thrdSignal ); //searchKmer
951 time ( &time_aft );
952 t2 += time_aft - time_bef;
953 time ( &time_bef );
954 sendWorkSignal ( 3, thrdSignal ); //parse1read
955 time ( &time_aft );
956 t3 += time_aft - time_bef;
957 time ( &time_bef );
958 sendWorkSignal ( 4, thrdSignal ); //search1kmerPlus
959 time ( &time_aft );
960 t4 += time_aft - time_bef;
961 time ( &time_bef );
962 sendWorkSignal ( 6, thrdSignal ); //thread_add1preArc
963 time ( &time_aft );
964 t5 += time_aft - time_bef;
965 time ( &time_bef );
967 //recordPreArc();
968 if ( repsTie )
970 recordPathBin ( outfp );
973 time ( &time_aft );
974 t6 += time_aft - time_bef;
975 //output_path(read_c,edge_no,flags,outfp);
976 kmer_c = 0;
977 read_c = 0;
978 time ( &read_start );
982 else if ( lib_array[libNo].curr_type == 1 || lib_array[libNo].curr_type == 2 )
984 initAIO ( &aio1, aioBuffer1, fileno ( lib_array[libNo].fp1 ), maxAIOSize );
985 initAIO ( &aio2, aioBuffer2, fileno ( lib_array[libNo].fp2 ), maxAIOSize );
986 int offset1, offset2, flag1, flag2, rt1, rt2;
987 offset1 = offset2 = 0;
988 rt1 = aio_read ( &aio1 );
989 rt2 = aio_read ( &aio2 );
990 flag1 = AIORead ( &aio1, &offset1, readBuffer1, cach1, &rt1, lib_array[libNo].curr_type );
991 flag2 = AIORead ( &aio2, &offset2, readBuffer2, cach2, &rt2, lib_array[libNo].curr_type );
993 if ( flag1 && flag2 )
995 int start1, start2, turn;
996 start1 = start2 = 0;
997 turn = 1;
999 while ( start1 < offset1 || start2 < offset2 )
1001 if ( turn == 1 )
1003 turn = 2;
1004 readseqInLib ( seqBuffer[read_c], next_name, & ( lenBuffer[read_c] ), readBuffer1, &start1, offset1, libNo );
1006 if ( ( ++i ) % 100000000 == 0 )
1007 { fprintf ( stderr, "--- %lldth reads.\n", i ); }
1009 if ( lenBuffer[read_c] < overlaplen + 1 )
1011 if ( start1 >= offset1 )
1013 start1 = 0;
1014 offset1 = 0;
1015 flag1 = AIORead ( &aio1, &offset1, readBuffer1, cach1, &rt1, lib_array[libNo].curr_type );
1018 continue;
1021 indexArray[read_c] = kmer_c;
1022 kmer_c += lenBuffer[read_c] - overlaplen + 1;
1023 read_c++;
1025 if ( start1 >= offset1 )
1027 start1 = 0;
1028 offset1 = 0;
1029 flag1 = AIORead ( &aio1, &offset1, readBuffer1, cach1, &rt1, lib_array[libNo].curr_type );
1032 if ( read_c == maxReadNum )
1034 indexArray[read_c] = kmer_c;
1035 time ( &read_end );
1036 t0 += read_end - read_start;
1037 time ( &time_bef );
1038 sendWorkSignal ( 2, thrdSignal ); //chopKmer4read
1039 time ( &time_aft );
1040 t1 += time_aft - time_bef;
1041 time ( &time_bef );
1042 sendWorkSignal ( 1, thrdSignal ); //searchKmer
1043 time ( &time_aft );
1044 t2 += time_aft - time_bef;
1045 time ( &time_bef );
1046 sendWorkSignal ( 3, thrdSignal ); //parse1read
1047 time ( &time_aft );
1048 t3 += time_aft - time_bef;
1049 time ( &time_bef );
1050 sendWorkSignal ( 4, thrdSignal ); //search1kmerPlus
1051 time ( &time_aft );
1052 t4 += time_aft - time_bef;
1053 time ( &time_bef );
1054 sendWorkSignal ( 6, thrdSignal ); //thread_add1preArc
1055 time ( &time_aft );
1056 t5 += time_aft - time_bef;
1057 time ( &time_bef );
1059 //recordPreArc();
1060 if ( repsTie )
1061 { recordPathBin ( outfp ); }
1063 time ( &time_aft );
1064 t6 += time_aft - time_bef;
1065 //output_path(read_c,edge_no,flags,outfp);
1066 kmer_c = 0;
1067 read_c = 0;
1068 time ( &read_start );
1071 continue;
1074 if ( turn == 2 )
1076 turn = 1;
1077 readseqInLib ( seqBuffer[read_c], next_name, & ( lenBuffer[read_c] ), readBuffer2, &start2, offset2, libNo );
1079 if ( ( ++i ) % 100000000 == 0 )
1080 { fprintf ( stderr, "--- %lldth reads.\n", i ); }
1082 if ( lenBuffer[read_c] < overlaplen + 1 )
1084 if ( ( flag2 == 2 ) && ( start2 >= offset2 ) )
1085 { break; }
1087 if ( start2 >= offset2 )
1089 start2 = 0;
1090 offset2 = 0;
1091 flag2 = AIORead ( &aio2, &offset2, readBuffer2, cach2, &rt2, lib_array[libNo].curr_type );
1094 continue;
1097 indexArray[read_c] = kmer_c;
1098 kmer_c += lenBuffer[read_c] - overlaplen + 1;
1099 read_c++;
1101 if ( ( flag2 == 2 ) && ( start2 >= offset2 ) )
1102 { break; }
1104 if ( start2 >= offset2 )
1106 start2 = 0;
1107 offset2 = 0;
1108 flag2 = AIORead ( &aio2, &offset2, readBuffer2, cach2, &rt2, lib_array[libNo].curr_type );
1111 if ( read_c == maxReadNum )
1113 indexArray[read_c] = kmer_c;
1114 time ( &read_end );
1115 t0 += read_end - read_start;
1116 time ( &time_bef );
1117 sendWorkSignal ( 2, thrdSignal ); //chopKmer4read
1118 time ( &time_aft );
1119 t1 += time_aft - time_bef;
1120 time ( &time_bef );
1121 sendWorkSignal ( 1, thrdSignal ); //searchKmer
1122 time ( &time_aft );
1123 t2 += time_aft - time_bef;
1124 time ( &time_bef );
1125 sendWorkSignal ( 3, thrdSignal ); //parse1read
1126 time ( &time_aft );
1127 t3 += time_aft - time_bef;
1128 time ( &time_bef );
1129 sendWorkSignal ( 4, thrdSignal ); //search1kmerPlus
1130 time ( &time_aft );
1131 t4 += time_aft - time_bef;
1132 time ( &time_bef );
1133 sendWorkSignal ( 6, thrdSignal ); //thread_add1preArc
1134 time ( &time_aft );
1135 t5 += time_aft - time_bef;
1136 time ( &time_bef );
1138 //recordPreArc();
1139 if ( repsTie )
1140 { recordPathBin ( outfp ); }
1142 time ( &time_aft );
1143 t6 += time_aft - time_bef;
1144 //output_path(read_c,edge_no,flags,outfp);
1145 kmer_c = 0;
1146 read_c = 0;
1147 time ( &read_start );
1150 continue;
1154 else
1156 fprintf(stderr, "Error: aio_read error.\n");
1159 else
1161 initAIO ( &aio1, aioBuffer1, fileno ( lib_array[libNo].fp1 ), maxAIOSize );
1162 int offset, flag1, rt;
1163 offset = 0;
1164 rt = aio_read ( &aio1 );
1166 while ( ( flag1 = AIORead ( &aio1, &offset, readBuffer1, cach1, &rt, lib_array[libNo].curr_type ) ) )
1168 int start = 0;
1170 while ( start < offset )
1172 readseqInLib ( seqBuffer[read_c], next_name, & ( lenBuffer[read_c] ), readBuffer1, &start, offset, libNo );
1174 if ( ( ++i ) % 100000000 == 0 )
1175 { fprintf ( stderr, "--- %lldth reads.\n", i ); }
1177 if ( lenBuffer[read_c] < overlaplen + 1 )
1178 { continue; }
1180 indexArray[read_c] = kmer_c;
1181 kmer_c += lenBuffer[read_c] - overlaplen + 1;
1182 read_c++;
1185 if ( read_c > maxReadNum - 1024 )
1187 indexArray[read_c] = kmer_c;
1188 time ( &read_end );
1189 t0 += read_end - read_start;
1190 time ( &time_bef );
1191 sendWorkSignal ( 2, thrdSignal ); //chopKmer4read
1192 time ( &time_aft );
1193 t1 += time_aft - time_bef;
1194 time ( &time_bef );
1195 sendWorkSignal ( 1, thrdSignal ); //searchKmer
1196 time ( &time_aft );
1197 t2 += time_aft - time_bef;
1198 time ( &time_bef );
1199 sendWorkSignal ( 3, thrdSignal ); //parse1read
1200 time ( &time_aft );
1201 t3 += time_aft - time_bef;
1202 time ( &time_bef );
1203 sendWorkSignal ( 4, thrdSignal ); //search1kmerPlus
1204 time ( &time_aft );
1205 t4 += time_aft - time_bef;
1206 time ( &time_bef );
1207 sendWorkSignal ( 6, thrdSignal ); //thread_add1preArc
1208 time ( &time_aft );
1209 t5 += time_aft - time_bef;
1210 time ( &time_bef );
1212 //recordPreArc();
1213 if ( repsTie )
1214 { recordPathBin ( outfp ); }
1216 time ( &time_aft );
1217 t6 += time_aft - time_bef;
1218 //output_path(read_c,edge_no,flags,outfp);
1219 kmer_c = 0;
1220 read_c = 0;
1221 time ( &read_start );
1224 if ( flag1 == 2 )
1225 { break; }
1230 fprintf ( stderr, "%lld read(s) processed.\n", i );
1231 // fprintf (stderr,"Time ReadingReads: %d,chopKmer4read: %d,searchKmer: %d,parse1read: %d,search1kmerPlus: %d,thread_add1preArc: %d,recordPathBin: %d\n", t0, t1, t2, t3, t4, t5, t6);
1232 fprintf ( stderr, "Time spent on:\n" );
1233 fprintf ( stderr, " importing reads: %ds,\n", t0 );
1234 fprintf ( stderr, " chopping reads to kmers: %ds,\n", t1 );
1235 fprintf ( stderr, " searching kmers: %ds,\n", t2 );
1236 fprintf ( stderr, " aligning reads to edges: %ds,\n", t3 );
1237 fprintf ( stderr, " searching (K+1)mers: %ds,\n", t4 );
1238 fprintf ( stderr, " adding pre-arcs: %ds,\n", t5 );
1239 fprintf ( stderr, " recording read paths: %ds.\n", t6 );
1241 if ( read_c )
1243 indexArray[read_c] = kmer_c;
1244 sendWorkSignal ( 2, thrdSignal ); //chopKmer4read
1245 sendWorkSignal ( 1, thrdSignal ); //searchKmer
1246 sendWorkSignal ( 3, thrdSignal ); //parse1read
1247 sendWorkSignal ( 4, thrdSignal ); //search1kmerPlus
1248 sendWorkSignal ( 6, thrdSignal ); //thread_add1preArc
1250 //recordPreArc();
1251 if ( repsTie )
1253 recordPathBin ( outfp );
1257 fprintf ( stderr, "%lld marker(s) output.\n", markCounter );
1258 sendWorkSignal ( 5, thrdSignal ); //over
1259 thread_wait ( threads );
1260 output_arcs ( outfile );
1261 memoFree4preArc ();
1263 if ( 1 ) // multi-threads
1265 arcCounter = 0;
1267 for ( i = 0; i < thrd_num; i++ )
1269 arcCounter += arcCounters[i];
1270 free ( ( void * ) flags[i + 1] );
1271 deletion[0] += deletion[i + 1];
1272 free ( ( void * ) rcSeq[i + 1] );
1276 if ( 1 )
1278 free ( ( void * ) flags[0] );
1279 free ( ( void * ) rcSeq[0] );
1282 fprintf ( stderr, "Reads alignment done, %d read(s) deleted, %lld pre-arc(s) added.\n", deletion[0], arcCounter );
1284 if ( repsTie )
1286 free ( ( void * ) markerOnEdge );
1287 free ( ( void * ) fwriteBuf );
1290 free ( ( void * ) arcCounters );
1291 free ( ( void * ) rcSeq );
1293 for ( i = 0; i < maxReadNum; i++ )
1295 free ( ( void * ) seqBuffer[i] );
1298 free ( ( void * ) seqBuffer );
1299 free ( ( void * ) lenBuffer );
1300 free ( ( void * ) indexArray );
1301 free ( ( void * ) flags );
1302 free ( ( void * ) deletion );
1303 free ( ( void * ) kmerBuffer );
1304 free ( ( void * ) mixBuffer );
1305 free ( ( void * ) smallerBuffer );
1306 free ( ( void * ) flagArray );
1307 free ( ( void * ) hashBanBuffer );
1308 free ( ( void * ) nodeBuffer );
1309 free ( ( void * ) src_name );
1310 free ( ( void * ) next_name );
1311 free ( ( void * ) aioBuffer1 );
1312 free ( ( void * ) aioBuffer2 );
1313 free ( ( void * ) readBuffer1 );
1314 free ( ( void * ) readBuffer2 );
1315 free ( ( void * ) cach1 );
1316 free ( ( void * ) cach2 );
1318 if ( gLineLen < maxReadLen )
1320 free ( ( void * ) gStr );
1321 gStr = NULL;
1324 if ( repsTie )
1326 fclose ( outfp );
1329 free_pe_mem ();
1330 free_libs ();
1333 static void thread_wait ( pthread_t * threads )
1335 int i;
1337 for ( i = 0; i < thrd_num; i++ )
1338 if ( threads[i] != 0 )
1340 pthread_join ( threads[i], NULL );