Merge pull request #378 from taoliu/fix_setup_script_364
[MACS.git] / MACS2 / Signal.pyx
blob9a0b28b948b183f229a8d65cfd69be4db998f9f8
1 # cython: language_level=3
2 # cython: profile=True
3 # Time-stamp: <2019-12-18 16:49:03 taoliu>
5 """Module Description: functions to find maxima minima or smooth the
6 signal tracks.
8 This code is free software; you can redistribute it and/or modify it
9 under the terms of the BSD License (see the file LICENSE included with
10 the distribution).
11 """
13 # smoothing function
14 import numpy as np
15 cimport numpy as np
16 from cpython cimport bool
17 from math import sqrt as mathsqrt
18 from math import factorial as mathfactorial
20 cpdef np.ndarray[np.int32_t, ndim=1] maxima(np.ndarray[np.float32_t, ndim=1] signal,
21 int window_size=51):
22 """return the local maxima in a signal after applying a 2nd order
23 Savitsky-Golay (polynomial) filter using window_size specified
24 """
25 cdef:
26 np.ndarray[np.int32_t, ndim=1] m
27 np.ndarray[np.float64_t, ndim=1] smoothed
28 np.ndarray[np.float32_t, ndim=1] sign, diff
30 window_size = window_size//2*2+1 # to make an odd number
31 smoothed = savitzky_golay_order2_deriv1(signal, window_size).round(16)
32 sign = np.sign( smoothed.astype("float32") )
33 diff = np.diff( sign )
34 m = np.where( diff <= -1)[0].astype('int32')
35 return m
37 cdef np.ndarray[np.int32_t, ndim=1] internal_minima( np.ndarray[np.float32_t, ndim=1] signal,
38 np.ndarray[np.int32_t, ndim=1] maxima ):
39 cdef:
40 np.ndarray[np.int32_t, ndim=1] ret
41 int n = maxima.shape[0]
42 int i, v, v2
43 if n == 0 or n == 1:
44 ret = np.ndarray(0, 'int32')
45 return ret
46 else:
47 ret = np.zeros(n - 1, 'int32')
48 pos1 = maxima[0]
49 for i in range(n - 1):
50 pos2 = maxima[i + 1]
51 ret[i] = np.where(signal[pos1:pos2] == signal[pos1:pos2].min())[0][0] + pos1
52 pos1 = pos2
53 return ret
55 cdef inline float sqrt(float threshold):
56 return mathsqrt(threshold)
58 cpdef enforce_peakyness(np.ndarray[np.float32_t, ndim=1] signal,
59 np.ndarray[np.int32_t, ndim=1] maxima):
60 """requires peaks described by a signal and a set of points where the signal
61 is at a maximum to meet a certain set of criteria
63 maxima which do not meet the required criteria are discarded
65 criteria:
66 for each peak:
67 calculate a threshold of the maximum of its adjacent two minima
68 plus the sqrt of that value
69 subtract the threshold from the region bounded by those minima
70 clip that region if negative values occur inside it
71 require it be > 50 bp in width
72 require that it not be too flat (< 6 unique values)
73 """
74 cdef:
75 np.ndarray[np.int32_t, ndim=1] minima = internal_minima(signal, maxima)
76 np.ndarray[np.float32_t, ndim=1] new_signal
77 int n = minima.shape[0]
78 float threshold
79 np.ndarray[np.int32_t, ndim=1] peaky_maxima = maxima.copy()
80 int j = 0
81 if n == 0: return maxima
82 # else:
83 threshold = signal[minima[0]]
84 threshold += sqrt(threshold)
85 new_signal = signal[0:minima[0]] - threshold - sqrt(threshold)
86 # assert maxima[0] < minima[0], '%d > %d' % ( maxima[0], minima[0] )
87 if is_valid_peak(new_signal, maxima[0]):
88 peaky_maxima[0] = maxima[0]
89 j += 1
90 for i in range(n - 1):
91 threshold = max(signal[minima[i]], signal[minima[i + 1]])
92 threshold += sqrt(threshold)
93 new_signal = signal[minima[i]:minima[i+1]] - threshold
94 new_maximum = maxima[i+1] - minima[i]
95 if is_valid_peak(new_signal, new_maximum):
96 peaky_maxima[j] = maxima[i + 1]
97 j += 1
98 threshold = signal[minima[-1]]
99 threshold += sqrt(threshold)
100 new_signal = signal[minima[-1]:] - threshold
101 new_maximum = maxima[-1] - minima[-1]
102 if is_valid_peak(new_signal, new_maximum):
103 peaky_maxima[j] = maxima[-1]
104 j += 1
105 peaky_maxima.resize(j, refcheck=False)
106 return peaky_maxima
108 # hardcoded minimum peak width = 50
109 cdef bool is_valid_peak(np.ndarray[np.float32_t, ndim=1] signal, int maximum):
110 cdef:
111 s = hard_clip(signal, maximum)
112 int length = s.shape[0]
113 if length < 50: return False
114 elif too_flat(s): return False
115 return True
117 # require at least 6 different float values -- prevents broad flat peaks
118 cdef bool too_flat(np.ndarray[np.float32_t, ndim=1] signal):
119 # """return whether signal has at least 6 unique values
120 # """
121 return np.unique(signal).shape[0] < 6
123 # hard clip a region with negative values
124 cdef np.ndarray[np.float32_t, ndim=1] hard_clip(np.ndarray[np.float32_t, ndim=1] signal, int maximum):
125 # """clip the signal in both directions at the nearest values <= 0
126 # to position maximum
127 # """
128 cdef:
129 int i
130 int left = 0
131 int right = signal.shape[0]
132 # clip left
133 for i in range(right - maximum, 0):
134 if signal[-i] < 0:
135 left = i
136 break
137 for i in range(maximum, right):
138 if signal[i] < 0:
139 right = i
140 break
141 return signal[left:right]
143 # for all maxima, set min_subpeak_width = 0
144 #cpdef peak_maxima(np.ndarray[np.float32_t, ndim=1] signal,
145 # int window_size=51, int min_subpeak_width=5):
146 # cdef:
147 # np.ndarray[np.float32_t, ndim=1] D = savitzky_golay_order2(signal, window_size, deriv=1)
148 # np.ndarray[np.int32_t, ndim=1] m = np.where(np.diff(np.sign(D)) == 2)[0].astype('int32') + 1
149 # np.ndarray[np.int32_t, ndim=1] n = np.zeros_like(m)
150 # int i_max
151 # int halfw = (min_subpeak_width - 1) / 2
152 # int signalw = D.shape[0]
153 # int i_new = 0
154 # int pos, start, end
155 # if m.shape[0] == 0: return m
156 # elif m.shape[0] == 1: return m
157 # else:
158 # i_max = np.where(signal[m] == signal[m].max())[0][0]
159 # for pos in m:
160 # start = max(pos - halfw, 0)
161 # end = min(pos + halfw, signalw)
162 # if ((D[start:pos] >= 0).all() and (D[(pos + 1):end] <= 0).all()):
163 # n[i_new] = pos
164 # i_new += 1
165 # if i_new == 0:
166 # return m[i_max:(i_max + 1)]
167 # else:
168 # n.resize(i_new, refcheck=False)
169 # return n
171 cpdef enforce_valleys(np.ndarray[np.float32_t, ndim=1] signal,
172 np.ndarray[np.int32_t, ndim=1] summits,
173 float min_valley = 0.8):
174 """require a value of <= min_valley * lower summit
175 between each pair of summits
177 cdef:
178 float req_min, v, prev_v
179 int summit_pos, prev_summit_pos
180 int n_summits = summits.shape[0]
181 int n_valid_summits = 1
182 np.ndarray[np.int32_t, ndim=1] valid_summits = summits.copy()
183 # Step 1: Remove peaks that do not have sufficient valleys
184 if n_summits == 1: return summits
185 for i in range(1, n_summits):
186 prev_summit_pos = valid_summits[n_valid_summits-1]
187 summit_pos = summits[i]
188 prev_v = signal[prev_summit_pos]
189 v = signal[summit_pos]
190 req_min = min_valley * min(prev_v, v)
191 if (signal[prev_summit_pos:summit_pos] < req_min).any():
192 valid_summits[n_valid_summits] = summit_pos
193 n_valid_summits += 1
194 elif v > prev_v:
195 valid_summits[n_valid_summits-1] = summit_pos
196 valid_summits.resize(n_valid_summits, refcheck=False)
197 # Step 2: Re-find peaks from subtracted signal
199 return valid_summits
201 # Modified from http://www.scipy.org/Cookbook/SavitzkyGolay
202 # positive window_size not enforced anymore
203 # needs sane input paramters, window size > 4
204 # switched to double precision for internal accuracy
205 cpdef np.ndarray[np.float64_t, ndim=1] savitzky_golay_order2_deriv1(np.ndarray[np.float32_t, ndim=1] signal,
206 int window_size):
207 """Smooth (and optionally differentiate) data with a Savitzky-Golay filter.
208 The Savitzky-Golay filter removes high frequency noise from data.
209 It has the advantage of preserving the original shape and
210 features of the signal better than other types of filtering
211 approaches, such as moving averages techhniques.
212 Parameters
213 ----------
214 y : array_like, shape (N,)
215 the values of the time history of the signal.
216 window_size : int
217 the length of the window. Must be an odd integer number.
218 deriv: int
219 the order of the derivative to compute (default = 0 means only smoothing)
220 Returns
221 -------
222 ys : ndarray, shape (N)
223 the smoothed signal (or it's n-th derivative).
224 Notes
225 -----
226 The Savitzky-Golay is a type of low-pass filter, particularly
227 suited for smoothing noisy data. The main idea behind this
228 approach is to make for each point a least-square fit with a
229 polynomial of high order over a odd-sized window centered at
230 the point.
232 References
233 ----------
234 .. [1] A. Savitzky, M. J. E. Golay, Smoothing and Differentiation of
235 Data by Simplified Least Squares Procedures. Analytical
236 Chemistry, 1964, 36 (8), pp 1627-1639.
237 .. [2] Numerical Recipes 3rd Edition: The Art of Scientific Computing
238 W.H. Press, S.A. Teukolsky, W.T. Vetterling, B.P. Flannery
239 Cambridge University Press ISBN-13: 9780521880688
241 cdef:
242 int half_window, k
243 np.ndarray[np.int64_t, ndim=2] b
244 # pad the signal at the extremes with
245 # values taken from the signal itself
246 np.ndarray[np.float32_t, ndim=1] firstvals, lastvals
247 np.ndarray[np.float64_t, ndim=1] m, ret
249 if window_size % 2 != 1: window_size += 1
250 half_window = (window_size - 1) // 2
251 # precompute coefficients
252 b = np.array([[1, k, k**2] for k in range(-half_window, half_window+1)],
253 dtype='int64')
254 m = np.linalg.pinv(b)[1]
255 # pad the signal at the extremes with
256 # values taken from the signal itself
257 firstvals = signal[0] - np.abs(signal[1:half_window+1][::-1] - signal[0])
258 lastvals = signal[-1] + np.abs(signal[-half_window-1:-1][::-1] - signal[-1])
259 signal = np.concatenate((firstvals, signal, lastvals))
260 #print (repr(m))
261 ret = np.convolve( m[::-1], signal.astype("float64"), mode='valid') #.astype("float32").round(8) # round to 8 decimals to avoid signing issue
262 #print (ret[160:165])
263 return ret
266 # Another modified version from http://www.scipy.org/Cookbook/SavitzkyGolay
267 cpdef np.ndarray[np.float32_t, ndim=1] savitzky_golay( np.ndarray[np.float32_t, ndim=1] y, int window_size,
268 int order, int deriv = 0, int rate = 1 ):
269 """Smooth (and optionally differentiate) data with a Savitzky-Golay filter.
270 The Savitzky-Golay filter removes high frequency noise from data.
271 It has the advantage of preserving the original shape and
272 features of the signal better than other types of filtering
273 approaches, such as moving averages techniques.
274 Parameters
275 ----------
276 y : array_like, shape (N,)
277 the values of the time history of the signal.
278 window_size : int
279 the length of the window. Must be an odd integer number.
280 order : int
281 the order of the polynomial used in the filtering.
282 Must be less then `window_size` - 1.
283 deriv: int
284 the order of the derivative to compute (default = 0 means only smoothing)
285 Returns
286 -------
287 ys : ndarray, shape (N)
288 the smoothed signal (or it's n-th derivative).
289 Notes
290 -----
291 The Savitzky-Golay is a type of low-pass filter, particularly
292 suited for smoothing noisy data. The main idea behind this
293 approach is to make for each point a least-square fit with a
294 polynomial of high order over a odd-sized window centered at
295 the point.
296 Examples
297 --------
298 t = np.linspace(-4, 4, 500)
299 y = np.exp( -t**2 ) + np.random.normal(0, 0.05, t.shape)
300 ysg = savitzky_golay(y, window_size=31, order=4)
301 import matplotlib.pyplot as plt
302 plt.plot(t, y, label='Noisy signal')
303 plt.plot(t, np.exp(-t**2), 'k', lw=1.5, label='Original signal')
304 plt.plot(t, ysg, 'r', label='Filtered signal')
305 plt.legend()
306 plt.show()
307 References
308 ----------
309 .. [1] A. Savitzky, M. J. E. Golay, Smoothing and Differentiation of
310 Data by Simplified Least Squares Procedures. Analytical
311 Chemistry, 1964, 36 (8), pp 1627-1639.
312 .. [2] Numerical Recipes 3rd Edition: The Art of Scientific Computing
313 W.H. Press, S.A. Teukolsky, W.T. Vetterling, B.P. Flannery
314 Cambridge University Press ISBN-13: 9780521880688
316 cdef:
317 int half_window, k
318 np.ndarray[np.int64_t, ndim=2] b
319 # pad the signal at the extremes with
320 # values taken from the signal itself
321 np.ndarray[np.float32_t, ndim=1] firstvals, lastvals, ret
322 np.ndarray[np.float64_t, ndim=1] m
324 try:
325 window_size = np.abs( np.int( window_size ) )
326 order = np.abs( np.int( order ) )
327 except ValueError, msg:
328 raise ValueError("window_size and order have to be of type int")
329 if window_size % 2 != 1 or window_size < 1:
330 raise TypeError("window_size size must be a positive odd number")
331 if window_size < order + 2:
332 raise TypeError("window_size is too small for the polynomials order")
333 half_window = ( window_size -1 ) // 2
334 # precompute coefficients
335 b = np.array( [ [ k**i for i in range( order + 1 ) ] for k in range( -half_window, half_window+1 ) ] )
336 m = np.linalg.pinv( b )[ deriv ] * rate**deriv * mathfactorial( deriv )
337 # pad the signal at the extremes with
338 # values taken from the signal itself
339 firstvals = y[ 0 ] - np.abs( y[ 1:half_window + 1 ][ ::-1 ] - y[ 0 ] )
340 lastvals = y[ -1 ] + np.abs( y[ -half_window - 1:-1 ][ ::-1 ] - y[ -1 ])
341 y = np.concatenate( ( firstvals, y, lastvals ) )
342 ret = np.convolve( m[ ::-1 ], y, mode = 'valid' ).astype("float32")
343 return ret