1 # cython: language_level=3
3 # Time-stamp: <2020-12-04 18:41:28 Tao Liu>
5 """Module for SAPPER BAMParser class
7 Copyright (c) 2017 Tao Liu <tliu4@buffalo.edu>
9 This code is free software; you can redistribute it and/or modify it
10 under the terms of the BSD License (see the file COPYING included
11 with the distribution).
16 @contact: tliu4@buffalo.edu
19 # ------------------------------------
21 # ------------------------------------
22 from cpython cimport bool
29 ctypedef np
.float32_t float32_t
30 ctypedef np
.int32_t int32_t
32 #from libc.math cimport log10, log, exp, M_LN10 #,fabs,log1p
33 #from libc.math cimport M_LN10
34 from math
import log1p
, exp
, log
36 LN10
= 2.3025850929940458
37 LN10_tenth
= 0.23025850929940458
39 @cython
.boundscheck
(False
) # turn off bounds-checking for entire function
40 @cython
.wraparound
(False
) # turn off negative index wrapping for entire function
41 cpdef
tuple CalModel_Homo
( np
.ndarray
[int32_t
, ndim
=1] top1_bq_T
, np
.ndarray
[int32_t
, ndim
=1] top1_bq_C
, np
.ndarray
[int32_t
, ndim
=1] top2_bq_T
, np
.ndarray
[int32_t
, ndim
=1] top2_bq_C
):
50 # Phred score is Phred = -10log_{10} E, where E is the error rate.
51 # to get the 1-E: 1-E = 1-exp( Phred/-10*M_LN10 ) = 1-exp( Phred * -LOG10_E_tenth )
52 for i
in range( top1_bq_T
.shape
[0] ):
53 lnL
+= log1p
( -exp
(-top1_bq_T
[ i
]*LN10_tenth
) )
54 for i
in range( top1_bq_C
.shape
[0] ):
55 lnL
+= log1p
( -exp
(-top1_bq_C
[ i
]*LN10_tenth
) )
57 for i
in range( top2_bq_T
.shape
[0] ):
58 lnL
+= log
( exp
(-top2_bq_T
[ i
]*LN10_tenth
) )
59 for i
in range( top2_bq_C
.shape
[0] ):
60 lnL
+= log
( exp
(-top2_bq_C
[ i
]*LN10_tenth
) )
62 BIC
= -2*lnL
# no free variable, no penalty
65 @cython
.boundscheck
(False
) # turn off bounds-checking for entire function
66 @cython
.wraparound
(False
) # turn off negative index wrapping for entire function
67 cpdef
tuple CalModel_Heter_noAS
( np
.ndarray
[int32_t
, ndim
=1] top1_bq_T
,np
.ndarray
[int32_t
, ndim
=1] top1_bq_C
,np
.ndarray
[int32_t
, ndim
=1] top2_bq_T
,np
.ndarray
[int32_t
, ndim
=1] top2_bq_C
):
77 int tn_T
, tn_C
, tn
# total observed NTs
78 double lnL_T
, lnL_C
# log likelihood for treatment and control
83 # total oberseved treatment reads from top1 and top2 NTs
84 tn_T
= top1_bq_T
.shape
[0] + top2_bq_T
.shape
[0]
87 raise Exception("Total number of treatment reads is 0!")
89 ( lnL_T
, k_T
) = GreedyMaxFunctionNoAS
( top1_bq_T
.shape
[0], top2_bq_T
.shape
[0], tn_T
, top1_bq_T
, top2_bq_T
)
94 tn_C
= top1_bq_C
.shape
[0] + top2_bq_C
.shape
[0]
99 ( lnL_C
, k_C
) = GreedyMaxFunctionNoAS
( top1_bq_C
.shape
[0], top2_bq_C
.shape
[0], tn_C
, top1_bq_C
, top2_bq_C
)
105 # we penalize big model depending on the number of reads/samples
111 BIC
+= log
( tn_T
) + log
( tn_C
)
116 @cython
.boundscheck
(False
) # turn off bounds-checking for entire function
117 @cython
.wraparound
(False
) # turn off negative index wrapping for entire function
118 cpdef
tuple CalModel_Heter_AS
( np
.ndarray
[int32_t
, ndim
=1] top1_bq_T
, np
.ndarray
[int32_t
, ndim
=1] top1_bq_C
, np
.ndarray
[int32_t
, ndim
=1] top2_bq_T
, np
.ndarray
[int32_t
, ndim
=1] top2_bq_C
, float max_allowed_ar
= 0.99 ):
129 int tn_T
, tn_C
, tn
# total observed NTs
130 double lnL_T
, lnL_C
# log likelihood for treatment and control
131 double AS_alleleratio
# allele ratio
136 #assert top2_bq_T.shape[0] + top2_bq_C.shape[0] > 0, "Total number of top2 nt should not be zero while using this function: CalModel_Heter_AS!"
139 tn_T
= top1_bq_T
.shape
[0] + top2_bq_T
.shape
[0]
142 raise Exception("Total number of treatment reads is 0!")
144 ( lnL_T
, k_T
, AS_alleleratio
) = GreedyMaxFunctionAS
( top1_bq_T
.shape
[0], top2_bq_T
.shape
[0], tn_T
, top1_bq_T
, top2_bq_T
, max_allowed_ar
)
145 #print ">>>",lnL_T, k_T, AS_alleleratio
150 tn_C
= top1_bq_C
.shape
[0] + top2_bq_C
.shape
[0]
155 # We assume control will not have allele preference
156 ( lnL_C
, k_C
) = GreedyMaxFunctionNoAS
( top1_bq_C
.shape
[0], top2_bq_C
.shape
[0], tn_C
, top1_bq_C
, top2_bq_C
)
162 # we penalize big model depending on the number of reads/samples
166 BIC
+= 2 * log
( tn_T
)
168 BIC
+= 2 * log
( tn_T
) + log
( tn_C
)
173 @cython
.boundscheck
(False
) # turn off bounds-checking for entire function
174 @cython
.wraparound
(False
) # turn off negative index wrapping for entire function
175 cdef tuple GreedyMaxFunctionAS
( int m
, int n
, int tn
, np
.ndarray
[int32_t
, ndim
=1] me
, np
.ndarray
[int32_t
, ndim
=1] ne
, float max_allowed_ar
= 0.99 ):
176 """Return lnL, k and alleleratio in tuple.
178 Note: I only translate Liqing's C++ code into pyx here. Haven't done any review.
181 double dnew
, dold
, rold
, rnew
185 double dl
, dr
, d0
, d1l
, d1r
189 if tn
== 1: # only 1 read; I don't expect this to be run...
190 dl
=calculate_ln
(m
,n
,tn
,me
,ne
,0,0);
191 dr
=calculate_ln
(m
,n
,tn
,me
,ne
,1,1);
199 elif m
== 0: #no top1 nt
200 return ( calculate_ln
( m
, n
, tn
, me
, ne
, 0, m
, max_allowed_ar
), m
, 1-max_allowed_ar
)
202 elif m
== tn
: #all reads are top1
203 return ( calculate_ln
( m
, n
, tn
, me
, ne
, 1, m
, max_allowed_ar
), m
, max_allowed_ar
)
207 d0
= calculate_ln
( m
, n
, tn
, me
, ne
, float(k0
)/tn
, k0
, max_allowed_ar
)
208 d1l
= calculate_ln
( m
, n
, tn
, me
, ne
, float(k0
-1)/tn
, k0
-1, max_allowed_ar
)
209 d1r
= calculate_ln
( m
, n
, tn
, me
, ne
, float(k0
+1)/tn
, k0
+1, max_allowed_ar
)
211 if d0
> d1l
-1e-8 and d0
> d1r
-1e-8:
218 rold
= float(k0
-1)/tn
219 while kold
> 1: #disable: when kold=1 still run, than knew=0 is the final run
221 rnew
= float(knew
)/tn
223 dnew
= calculate_ln
( m
,n
,tn
,me
,ne
,rnew
,knew
, max_allowed_ar
)
225 if(dnew
-1e-8 < dold
) :
232 if btemp
: #maximum L value is in [1,m-1];
235 return ( dold
, k
, ar
)
236 else: #L(k=0) is the max for [0,m-1]
239 return ( dold
, k
, ar
)
244 rold
= float(k0
+ 1)/tn
245 while kold
< tn
- 1: #//disable: when kold=tn-1 still run, than knew=tn is the final run
248 rnew
= float(knew
)/tn
250 dnew
= calculate_ln
( m
,n
,tn
,me
,ne
,rnew
,knew
, max_allowed_ar
)
252 if dnew
- 1e-8 < dold
:
259 if btemp
: #maximum L value is in [m+1,tn-1]
262 return ( dold
, k
, ar
)
263 else: #L(k=tn) is the max for [m+1,tn]
266 return ( dold
, k
, ar
)
268 raise Exception("error in GreedyMaxFunctionAS")
271 @cython
.boundscheck
(False
) # turn off bounds-checking for entire function
272 @cython
.wraparound
(False
) # turn off negative index wrapping for entire function
273 cdef tuple GreedyMaxFunctionNoAS
(int m
, int n
, int tn
, np
.ndarray
[int32_t
, ndim
=1] me
, np
.ndarray
[int32_t
, ndim
=1] ne
):
274 """Return lnL, and k in tuple.
276 Note: I only translate Liqing's C++ code into pyx here. Haven't done any review.
283 double bg_r
, dl
, dr
, d0
, d1l
, d1r
289 dl
= calculate_ln
( m
, n
, tn
, me
, ne
, bg_r
, 0)
290 dr
= calculate_ln
( m
, n
, tn
, me
, ne
, bg_r
, 1)
297 elif m
== 0: #no top1 nt
298 return ( calculate_ln
( m
, n
, tn
, me
, ne
, bg_r
, m
), m
)
300 elif m
== tn
: #all reads are top1
301 return ( calculate_ln
( m
, n
, tn
, me
, ne
, bg_r
, m
), m
)
309 d0
= calculate_ln
( m
, n
, tn
, me
, ne
, bg_r
, k0
)
310 d1l
= calculate_ln
( m
, n
, tn
, me
, ne
, bg_r
, k0
- 1)
311 d1r
= calculate_ln
( m
, n
, tn
, me
, ne
, bg_r
, k0
+ 1)
313 if d0
> d1l
- 1e-8 and d0
> d1r
- 1e-8:
319 while kold
>= 1: #//when kold=1 still run, than knew=0 is the final run
321 dnew
= calculate_ln
( m
, n
, tn
, me
, ne
, bg_r
, knew
)
322 if dnew
- 1e-8 < dold
:
328 if btemp
: #//maximum L value is in [1,m-1];
331 else: #//L(k=0) is the max for [0,m-1]
337 while kold
<= tn
- 1: #//when kold=tn-1 still run, than knew=tn is the final run
339 dnew
= calculate_ln
( m
, n
, tn
, me
, ne
, bg_r
, knew
)
340 if dnew
- 1e-8 < dold
:
346 if btemp
: #//maximum L value is in [m+1,tn-1]
349 else: #//L(k=tn) is the max for [m+1,tn]
353 raise Exception("error in GreedyMaxFunctionNoAS")
355 @cython
.boundscheck
(False
) # turn off bounds-checking for entire function
356 @cython
.wraparound
(False
) # turn off negative index wrapping for entire function
357 cdef calculate_ln
( int m
, int n
, int tn
, np
.ndarray
[int32_t
, ndim
=1] me
, np
.ndarray
[int32_t
, ndim
=1] ne
, double r
, int k
, float max_allowed_r
= 0.99):
358 """Calculate log likelihood given quality of top1 and top2, the ratio r and the observed k.
368 if r
> max_allowed_r
or r
< 1 - max_allowed_r
: # r is extremely high or
370 lnL
+= k
*log
( max_allowed_r
) + (tn
-k
)*log
( 1- max_allowed_r
) #-10000
372 lnL
+= k
*log
( r
) + (tn
-k
)*log
(1-r
)
374 if k
== 0 or k
== tn
: # it's entirely biased toward 1 allele
377 #lnL += k*log( max_allowed_r ) #-10000
381 lnL
+= log
(float(tn
-i
)/(k
-i
))
383 for i
in range( tn
-k
):
384 lnL
+= log
(float(tn
-i
)/(tn
-k
-i
))
387 e
= exp
( - me
[ i
] * LN10_tenth
)
388 lnL
+= log
((1-e
)*(float(k
)/tn
) + e
*(1-float(k
)/tn
))
391 e
= exp
( - ne
[ i
] * LN10_tenth
)
392 lnL
+= log
((1-e
)*(1-float(k
)/tn
) + e
*(float(k
)/tn
))
397 cpdef
int calculate_GQ
( double lnL1
, double lnL2
, double lnL3
):
398 """GQ1 = -10*log_{10}((L2+L3)/(L1+L2+L3))
403 double L1
, L2
, L3
, sum
, tmp
418 #if(L1<1e-110) L1=1e-110;
425 tmp
= ( L2
+ L3
)/sum
427 GQ_score
= (int)(-4.34294*log
(tmp
))
433 cpdef
int calculate_GQ_heterASsig
( double lnL1
, double lnL2
):
437 double L1
, L2
, sum
, tmp
440 #L1=exp(2.7182818,lnL1-lnL1)
442 L2
= exp
( lnL2
- lnL1
)
456 ASsig_score
= (int)(-4.34294*log
(tmp
))