1 # cython: language_level=3
3 # Time-stamp: <2024-10-14 17:04:27 Tao Liu>
7 This code is free software; you can redistribute it and/or modify it
8 under the terms of the BSD License (see the file LICENSE included with
11 # ------------------------------------
13 # ------------------------------------
15 from MACS3
.Signal
.Prob
import pnorm2
16 from MACS3
.Signal
.BedGraph
import bedGraphTrackI
17 from MACS3
.Signal
.Region
import Regions
18 from MACS3
.Utilities
.Logger
import logging
20 logger
= logging
.getLogger(__name__
)
24 # ------------------------------------
26 # ------------------------------------
31 def get_weighted_density(x
: cython
.int, m
: cython
.float, v
:
32 cython
.float, w
: cython
.float) -> cython
.float:
36 1. x: the observed value
37 2. m: the mean of gaussian
38 3. v: the variance of the gaussian
42 return w
* pnorm2(float(x
), m
, v
)
45 # ------------------------------------
47 # ------------------------------------
48 # ------------------------------------
50 # ------------------------------------
54 def generate_weight_mapping(fraglen_list
: list, means
: list, stddevs
:
55 list, min_frag_p
: cython
.float = 0.001) -> list:
56 """Generate weights for each fragment length in short, mono, di,
59 return: list of four dictionaries, with key as fraglen and value
62 ret[0] -- dictionary for short
63 ret[1] -- dictionary for mono
64 ret[2] -- dictionary for di
65 ret[3] -- dictionary for tri
86 assert len(means
) == 4
87 assert len(stddevs
) == 4
88 [m_s
, m_m
, m_d
, m_t
] = means
89 [v_s
, v_m
, v_d
, v_t
] = [x
**2 for x
in stddevs
]
90 ret_mapping
= [{}, {}, {}, {}]
91 for i
in range(len(fraglen_list
)):
93 p_s
= pnorm2(float(fl
), m_s
, v_s
)
94 p_m
= pnorm2(float(fl
), m_m
, v_m
)
95 p_d
= pnorm2(float(fl
), m_d
, v_d
)
96 p_t
= pnorm2(float(fl
), m_t
, v_t
)
97 s
= p_s
+ p_m
+ p_d
+ p_t
98 if p_s
< min_frag_p
and p_m
< min_frag_p
and p_d
< min_frag_p
and p_t
< min_frag_p
:
99 # we exclude the fragment which can't be assigned to
100 # short, mono, di-nuc, and tri-nuc (likelihood <
101 # min_frag_p, default:0.001) Normally this fragment is too
102 # large. We exclude these fragment by setting all weights
104 debug(f
"The fragment length {fl} can't be assigned to either distribution so will be excluded!")
105 ret_mapping
[0][fl
] = 0
106 ret_mapping
[1][fl
] = 0
107 ret_mapping
[2][fl
] = 0
108 ret_mapping
[3][fl
] = 0
110 ret_mapping
[0][fl
] = p_s
/ s
111 ret_mapping
[1][fl
] = p_m
/ s
112 ret_mapping
[2][fl
] = p_d
/ s
113 ret_mapping
[3][fl
] = p_t
/ s
118 def generate_digested_signals(petrack
, weight_mapping
: list) -> list:
119 """Generate digested pileup signals (four tracks) using weight mapping
121 return: list of four signals in dictionary, with key as chromosome name and value as a p-v array.
122 ret[0] -- dictionary for short
123 ret[1] -- dictionary for mono
124 ret[2] -- dictionary for di
125 ret[3] -- dictionary for tri
127 ret_digested_signals
: list
131 certain_signals
: dict
134 ret_digested_signals
= petrack
.pileup_bdg_hmmr(weight_mapping
)
136 for i
in range(4): # yes I hardcoded 4!
137 certain_signals
= ret_digested_signals
[i
]
138 bdg
= bedGraphTrackI()
139 for chrom
in sorted(certain_signals
.keys()):
140 bdg
.add_chrom_data_PV(chrom
, certain_signals
[chrom
])
141 ret_bedgraphs
.append(bdg
)
146 def extract_signals_from_regions(signals
: list, regions
, binsize
:
147 cython
.int = 10, hmm_type
: str = 'gaussian') -> list:
148 # we will take regions in peaks, create a bedGraphTrackI with
149 # binned regions in peaks, then let them overlap with signals to
150 # create a list (4) of value arrays.
154 extracted_positions
: list
161 ret_training_data
: list
162 ret_training_lengths
: list
163 ret_training_bins
: list
165 regionsbdg
= _make_bdg_of_bins_from_regions(regions
, binsize
)
166 debug('# extract_signals_from_regions: regionsbdg completed')
168 extracted_positions
= []
171 for signaltrack
in signals
: # four signal tracks
172 # signaltrack is bedGraphTrackI object
173 [positions
, values
, lengths
] = signaltrack
.extract_value_hmmr(regionsbdg
)
174 extracted_positions
.append(positions
)
175 extracted_data
.append(values
)
176 extracted_len
.append(lengths
)
180 debug('# extract_signals_from_regions: extracted positions, data, len')
181 ret_training_bins
= []
182 ret_training_data
= []
183 ret_training_lengths
= []
185 nn
= len(extracted_data
[0])
187 assert nn
== len(extracted_data
[1])
188 assert nn
== len(extracted_data
[2])
189 assert nn
== len(extracted_data
[3])
191 prev_c
= extracted_len
[0][0]
193 if hmm_type
== "gaussian":
195 ret_training_bins
.append(extracted_positions
[0][i
])
196 ret_training_data
.append(
197 [max(0.0001, extracted_data
[0][i
]),
198 max(0.0001, extracted_data
[1][i
]),
199 max(0.0001, extracted_data
[2][i
]),
200 max(0.0001, extracted_data
[3][i
])])
201 c
= extracted_len
[0][i
]
202 if counter
!= 0 and c
!= prev_c
:
203 ret_training_lengths
.append(counter
)
207 debug('# extract_signals_from_regions: ret_training bins, data, lengths - gaussian')
208 # poisson can only take int values as input
209 if hmm_type
== "poisson":
211 ret_training_bins
.append(extracted_positions
[0][i
])
212 ret_training_data
.append(
213 [int(max(0.0001, extracted_data
[0][i
])),
214 int(max(0.0001, extracted_data
[1][i
])),
215 int(max(0.0001, extracted_data
[2][i
])),
216 int(max(0.0001, extracted_data
[3][i
]))])
217 c
= extracted_len
[0][i
]
218 if counter
!= 0 and c
!= prev_c
:
219 ret_training_lengths
.append(counter
)
223 debug('# extract_signals_from_regions: ret_training bins, data, lengths - poisson')
225 ret_training_lengths
.append(counter
)
226 assert sum(ret_training_lengths
) == len(ret_training_data
)
227 assert len(ret_training_bins
) == len(ret_training_data
)
228 return [ret_training_bins
, ret_training_data
, ret_training_lengths
]
232 def _make_bdg_of_bins_from_regions(regions
, binsize
: cython
.int):
233 # this function will return a BedGraphTrackI object
245 assert isinstance(regions
, Regions
)
247 regionsbdg
= bedGraphTrackI(baseline_value
=-100)
250 # here we convert peaks from a PeakIO to BedGraph object with a
252 mark_bin
= 1 # this is to mark the continuous bins in the same region, it will increase by one while moving to the next region
253 for chrom
in sorted(regions
.get_chr_names()):
254 tmp_p
= 0 # this is to make gap in bedgraph for not covered regions.
256 for i
in range(len(ps
)):
260 # make bins, no need to be too accurate...
261 s
= s
//binsize
*binsize
262 e
= e
//binsize
*binsize
263 # tmp_n = int((e - s)/binsize)
264 for r
in range(s
, e
, binsize
):
268 regionsbdg
.add_loc_wo_merge(chrom
, tmp_p
, tmp_s
, 0) # the gap
269 regionsbdg
.add_loc_wo_merge(chrom
, tmp_s
, tmp_e
, mark_bin
) # the value we put in the bin bedgraph is the number of bins in this region
272 # end of region, we change the mark_bin
274 # we do not merge regions in regionsbdg object so each bin will be separated.
275 debug(f
"added {n} bins")