implement PETrackII for fragment files from scATAC experiments
[MACS.git] / MACS3 / Signal / HMMR_Signal_Processing.py
bloba6c4a8cba917b1d1e40b034422bf1d5e1719117c
1 # cython: language_level=3
2 # cython: profile=True
3 # Time-stamp: <2024-10-14 17:04:27 Tao Liu>
5 """Module description:
7 This code is free software; you can redistribute it and/or modify it
8 under the terms of the BSD License (see the file LICENSE included with
9 the distribution).
10 """
11 # ------------------------------------
12 # python modules
13 # ------------------------------------
14 import cython
15 from MACS3.Signal.Prob import pnorm2
16 from MACS3.Signal.BedGraph import bedGraphTrackI
17 from MACS3.Signal.Region import Regions
18 from MACS3.Utilities.Logger import logging
20 logger = logging.getLogger(__name__)
21 debug = logger.debug
22 info = logger.info
24 # ------------------------------------
25 # Misc functions
26 # ------------------------------------
29 @cython.inline
30 @cython.cfunc
31 def get_weighted_density(x: cython.int, m: cython.float, v:
32 cython.float, w: cython.float) -> cython.float:
33 """Description:
35 parameters:
36 1. x: the observed value
37 2. m: the mean of gaussian
38 3. v: the variance of the gaussian
39 4. w: the weight
40 return value:
41 """
42 return w * pnorm2(float(x), m, v)
45 # ------------------------------------
46 # Classes
47 # ------------------------------------
48 # ------------------------------------
49 # public functions
50 # ------------------------------------
53 @cython.ccall
54 def generate_weight_mapping(fraglen_list: list, means: list, stddevs:
55 list, min_frag_p: cython.float = 0.001) -> list:
56 """Generate weights for each fragment length in short, mono, di,
57 and tri-signals track
59 return: list of four dictionaries, with key as fraglen and value
60 as the weight.
62 ret[0] -- dictionary for short
63 ret[1] -- dictionary for mono
64 ret[2] -- dictionary for di
65 ret[3] -- dictionary for tri
67 """
69 ret_mapping: list
70 fl: cython.int
71 m_s: cython.float
72 m_m: cython.float
73 m_d: cython.float
74 m_t: cython.float
75 v_s: cython.float
76 v_m: cython.float
77 v_d: cython.float
78 v_t: cython.float
79 p_s: cython.float
80 p_m: cython.float
81 p_d: cython.float
82 p_t: cython.float
83 s: cython.float
84 i: cython.int
86 assert len(means) == 4
87 assert len(stddevs) == 4
88 [m_s, m_m, m_d, m_t] = means
89 [v_s, v_m, v_d, v_t] = [x**2 for x in stddevs]
90 ret_mapping = [{}, {}, {}, {}]
91 for i in range(len(fraglen_list)):
92 fl = fraglen_list[i]
93 p_s = pnorm2(float(fl), m_s, v_s)
94 p_m = pnorm2(float(fl), m_m, v_m)
95 p_d = pnorm2(float(fl), m_d, v_d)
96 p_t = pnorm2(float(fl), m_t, v_t)
97 s = p_s + p_m + p_d + p_t
98 if p_s < min_frag_p and p_m < min_frag_p and p_d < min_frag_p and p_t < min_frag_p:
99 # we exclude the fragment which can't be assigned to
100 # short, mono, di-nuc, and tri-nuc (likelihood <
101 # min_frag_p, default:0.001) Normally this fragment is too
102 # large. We exclude these fragment by setting all weights
103 # to zero.
104 debug(f"The fragment length {fl} can't be assigned to either distribution so will be excluded!")
105 ret_mapping[0][fl] = 0
106 ret_mapping[1][fl] = 0
107 ret_mapping[2][fl] = 0
108 ret_mapping[3][fl] = 0
109 continue
110 ret_mapping[0][fl] = p_s / s
111 ret_mapping[1][fl] = p_m / s
112 ret_mapping[2][fl] = p_d / s
113 ret_mapping[3][fl] = p_t / s
114 return ret_mapping
117 @cython.ccall
118 def generate_digested_signals(petrack, weight_mapping: list) -> list:
119 """Generate digested pileup signals (four tracks) using weight mapping
121 return: list of four signals in dictionary, with key as chromosome name and value as a p-v array.
122 ret[0] -- dictionary for short
123 ret[1] -- dictionary for mono
124 ret[2] -- dictionary for di
125 ret[3] -- dictionary for tri
127 ret_digested_signals: list
128 ret_bedgraphs: list
129 bdg: object
130 i: int
131 certain_signals: dict
132 chrom: bytes
134 ret_digested_signals = petrack.pileup_bdg_hmmr(weight_mapping)
135 ret_bedgraphs = []
136 for i in range(4): # yes I hardcoded 4!
137 certain_signals = ret_digested_signals[i]
138 bdg = bedGraphTrackI()
139 for chrom in sorted(certain_signals.keys()):
140 bdg.add_chrom_data_PV(chrom, certain_signals[chrom])
141 ret_bedgraphs.append(bdg)
142 return ret_bedgraphs
145 @cython.ccall
146 def extract_signals_from_regions(signals: list, regions, binsize:
147 cython.int = 10, hmm_type: str = 'gaussian') -> list:
148 # we will take regions in peaks, create a bedGraphTrackI with
149 # binned regions in peaks, then let them overlap with signals to
150 # create a list (4) of value arrays.
152 extracted_data: list
153 extracted_len: list
154 extracted_positions: list
155 signaltrack: object
156 regionsbdg: object
157 i: cython.int
158 c: cython.int
159 counter: cython.int
160 prev_c: cython.int
161 ret_training_data: list
162 ret_training_lengths: list
163 ret_training_bins: list
165 regionsbdg = _make_bdg_of_bins_from_regions(regions, binsize)
166 debug('# extract_signals_from_regions: regionsbdg completed')
167 # now, let's overlap
168 extracted_positions = []
169 extracted_data = []
170 extracted_len = []
171 for signaltrack in signals: # four signal tracks
172 # signaltrack is bedGraphTrackI object
173 [positions, values, lengths] = signaltrack.extract_value_hmmr(regionsbdg)
174 extracted_positions.append(positions)
175 extracted_data.append(values)
176 extracted_len.append(lengths)
177 positions = []
178 values = []
179 lengths = []
180 debug('# extract_signals_from_regions: extracted positions, data, len')
181 ret_training_bins = []
182 ret_training_data = []
183 ret_training_lengths = []
185 nn = len(extracted_data[0])
186 assert nn > 0
187 assert nn == len(extracted_data[1])
188 assert nn == len(extracted_data[2])
189 assert nn == len(extracted_data[3])
190 counter = 0
191 prev_c = extracted_len[0][0]
192 c = 0
193 if hmm_type == "gaussian":
194 for i in range(nn):
195 ret_training_bins.append(extracted_positions[0][i])
196 ret_training_data.append(
197 [max(0.0001, extracted_data[0][i]),
198 max(0.0001, extracted_data[1][i]),
199 max(0.0001, extracted_data[2][i]),
200 max(0.0001, extracted_data[3][i])])
201 c = extracted_len[0][i]
202 if counter != 0 and c != prev_c:
203 ret_training_lengths.append(counter)
204 counter = 0
205 prev_c = c
206 counter += 1
207 debug('# extract_signals_from_regions: ret_training bins, data, lengths - gaussian')
208 # poisson can only take int values as input
209 if hmm_type == "poisson":
210 for i in range(nn):
211 ret_training_bins.append(extracted_positions[0][i])
212 ret_training_data.append(
213 [int(max(0.0001, extracted_data[0][i])),
214 int(max(0.0001, extracted_data[1][i])),
215 int(max(0.0001, extracted_data[2][i])),
216 int(max(0.0001, extracted_data[3][i]))])
217 c = extracted_len[0][i]
218 if counter != 0 and c != prev_c:
219 ret_training_lengths.append(counter)
220 counter = 0
221 prev_c = c
222 counter += 1
223 debug('# extract_signals_from_regions: ret_training bins, data, lengths - poisson')
224 # last region
225 ret_training_lengths.append(counter)
226 assert sum(ret_training_lengths) == len(ret_training_data)
227 assert len(ret_training_bins) == len(ret_training_data)
228 return [ret_training_bins, ret_training_data, ret_training_lengths]
231 @cython.cfunc
232 def _make_bdg_of_bins_from_regions(regions, binsize: cython.int):
233 # this function will return a BedGraphTrackI object
234 regionsbdg: object
235 n: cython.long
236 chrom: bytes
237 ps: list
238 s: cython.int
239 e: cython.int
240 tmp_p: cython.int
241 mark_bin: cython.int
242 i: cython.int
243 r: cython.int
245 assert isinstance(regions, Regions)
247 regionsbdg = bedGraphTrackI(baseline_value=-100)
249 n = 0
250 # here we convert peaks from a PeakIO to BedGraph object with a
251 # given binsize.
252 mark_bin = 1 # this is to mark the continuous bins in the same region, it will increase by one while moving to the next region
253 for chrom in sorted(regions.get_chr_names()):
254 tmp_p = 0 # this is to make gap in bedgraph for not covered regions.
255 ps = regions[chrom]
256 for i in range(len(ps)):
257 # for each region
258 s = ps[i][0]
259 e = ps[i][1]
260 # make bins, no need to be too accurate...
261 s = s//binsize*binsize
262 e = e//binsize*binsize
263 # tmp_n = int((e - s)/binsize)
264 for r in range(s, e, binsize):
265 tmp_s = r
266 tmp_e = r + binsize
267 if tmp_s > tmp_p:
268 regionsbdg.add_loc_wo_merge(chrom, tmp_p, tmp_s, 0) # the gap
269 regionsbdg.add_loc_wo_merge(chrom, tmp_s, tmp_e, mark_bin) # the value we put in the bin bedgraph is the number of bins in this region
270 n += 1
271 tmp_p = tmp_e
272 # end of region, we change the mark_bin
273 mark_bin += 1
274 # we do not merge regions in regionsbdg object so each bin will be separated.
275 debug(f"added {n} bins")
276 return regionsbdg