1 # cython: language_level=3
3 # Time-stamp: <2019-12-18 16:49:03 taoliu>
5 """Module Description: functions to find maxima minima or smooth the
8 This code is free software; you can redistribute it and/or modify it
9 under the terms of the BSD License (see the file LICENSE included with
16 from cpython cimport bool
17 from math
import sqrt
as mathsqrt
18 from math
import factorial
as mathfactorial
20 cpdef np
.ndarray
[np
.int32_t
, ndim
=1] maxima
(np
.ndarray
[np
.float32_t
, ndim
=1] signal
,
22 """return the local maxima in a signal after applying a 2nd order
23 Savitsky-Golay (polynomial) filter using window_size specified
26 np
.ndarray
[np
.int32_t
, ndim
=1] m
27 np
.ndarray
[np
.float64_t
, ndim
=1] smoothed
28 np
.ndarray
[np
.float32_t
, ndim
=1] sign
, diff
30 window_size
= window_size
//2*2+1 # to make an odd number
31 smoothed
= savitzky_golay_order2_deriv1
(signal
, window_size
).round(16)
32 sign
= np
.sign
( smoothed
.astype
("float32") )
33 diff
= np
.diff
( sign
)
34 m
= np
.where
( diff
<= -1)[0].astype
('int32')
37 cdef np
.ndarray
[np
.int32_t
, ndim
=1] internal_minima
( np
.ndarray
[np
.float32_t
, ndim
=1] signal
,
38 np
.ndarray
[np
.int32_t
, ndim
=1] maxima
):
40 np
.ndarray
[np
.int32_t
, ndim
=1] ret
41 int n
= maxima
.shape
[0]
44 ret
= np
.ndarray
(0, 'int32')
47 ret
= np
.zeros
(n
- 1, 'int32')
49 for i
in range(n
- 1):
51 ret
[i
] = np
.where
(signal
[pos1
:pos2
] == signal
[pos1
:pos2
].min())[0][0] + pos1
55 cdef inline
float sqrt
(float threshold
):
56 return mathsqrt
(threshold
)
58 cpdef enforce_peakyness
(np
.ndarray
[np
.float32_t
, ndim
=1] signal
,
59 np
.ndarray
[np
.int32_t
, ndim
=1] maxima
):
60 """requires peaks described by a signal and a set of points where the signal
61 is at a maximum to meet a certain set of criteria
63 maxima which do not meet the required criteria are discarded
67 calculate a threshold of the maximum of its adjacent two minima
68 plus the sqrt of that value
69 subtract the threshold from the region bounded by those minima
70 clip that region if negative values occur inside it
71 require it be > 50 bp in width
72 require that it not be too flat (< 6 unique values)
75 np
.ndarray
[np
.int32_t
, ndim
=1] minima
= internal_minima
(signal
, maxima
)
76 np
.ndarray
[np
.float32_t
, ndim
=1] new_signal
77 int n
= minima
.shape
[0]
79 np
.ndarray
[np
.int32_t
, ndim
=1] peaky_maxima
= maxima
.copy
()
81 if n
== 0: return maxima
83 threshold
= signal
[minima
[0]]
84 threshold
+= sqrt
(threshold
)
85 new_signal
= signal
[0:minima
[0]] - threshold
- sqrt
(threshold
)
86 # assert maxima[0] < minima[0], '%d > %d' % ( maxima[0], minima[0] )
87 if is_valid_peak
(new_signal
, maxima
[0]):
88 peaky_maxima
[0] = maxima
[0]
90 for i
in range(n
- 1):
91 threshold
= max(signal
[minima
[i
]], signal
[minima
[i
+ 1]])
92 threshold
+= sqrt
(threshold
)
93 new_signal
= signal
[minima
[i
]:minima
[i
+1]] - threshold
94 new_maximum
= maxima
[i
+1] - minima
[i
]
95 if is_valid_peak
(new_signal
, new_maximum
):
96 peaky_maxima
[j
] = maxima
[i
+ 1]
98 threshold
= signal
[minima
[-1]]
99 threshold
+= sqrt
(threshold
)
100 new_signal
= signal
[minima
[-1]:] - threshold
101 new_maximum
= maxima
[-1] - minima
[-1]
102 if is_valid_peak
(new_signal
, new_maximum
):
103 peaky_maxima
[j
] = maxima
[-1]
105 peaky_maxima
.resize
(j
, refcheck
=False
)
108 # hardcoded minimum peak width = 50
109 cdef bool is_valid_peak
(np
.ndarray
[np
.float32_t
, ndim
=1] signal
, int maximum
):
111 s
= hard_clip
(signal
, maximum
)
112 int length
= s
.shape
[0]
113 if length
< 50: return False
114 elif too_flat
(s
): return False
117 # require at least 6 different float values -- prevents broad flat peaks
118 cdef bool too_flat
(np
.ndarray
[np
.float32_t
, ndim
=1] signal
):
119 # """return whether signal has at least 6 unique values
121 return np
.unique
(signal
).shape
[0] < 6
123 # hard clip a region with negative values
124 cdef np
.ndarray
[np
.float32_t
, ndim
=1] hard_clip
(np
.ndarray
[np
.float32_t
, ndim
=1] signal
, int maximum
):
125 # """clip the signal in both directions at the nearest values <= 0
126 # to position maximum
131 int right
= signal
.shape
[0]
133 for i
in range(right
- maximum
, 0):
137 for i
in range(maximum
, right
):
141 return signal
[left
:right
]
143 # for all maxima, set min_subpeak_width = 0
144 #cpdef peak_maxima(np.ndarray[np.float32_t, ndim=1] signal,
145 # int window_size=51, int min_subpeak_width=5):
147 # np.ndarray[np.float32_t, ndim=1] D = savitzky_golay_order2(signal, window_size, deriv=1)
148 # np.ndarray[np.int32_t, ndim=1] m = np.where(np.diff(np.sign(D)) == 2)[0].astype('int32') + 1
149 # np.ndarray[np.int32_t, ndim=1] n = np.zeros_like(m)
151 # int halfw = (min_subpeak_width - 1) / 2
152 # int signalw = D.shape[0]
154 # int pos, start, end
155 # if m.shape[0] == 0: return m
156 # elif m.shape[0] == 1: return m
158 # i_max = np.where(signal[m] == signal[m].max())[0][0]
160 # start = max(pos - halfw, 0)
161 # end = min(pos + halfw, signalw)
162 # if ((D[start:pos] >= 0).all() and (D[(pos + 1):end] <= 0).all()):
166 # return m[i_max:(i_max + 1)]
168 # n.resize(i_new, refcheck=False)
171 cpdef enforce_valleys
(np
.ndarray
[np
.float32_t
, ndim
=1] signal
,
172 np
.ndarray
[np
.int32_t
, ndim
=1] summits
,
173 float min_valley
= 0.8):
174 """require a value of <= min_valley * lower summit
175 between each pair of summits
178 float req_min
, v
, prev_v
179 int summit_pos
, prev_summit_pos
180 int n_summits
= summits
.shape
[0]
181 int n_valid_summits
= 1
182 np
.ndarray
[np
.int32_t
, ndim
=1] valid_summits
= summits
.copy
()
183 # Step 1: Remove peaks that do not have sufficient valleys
184 if n_summits
== 1: return summits
185 for i
in range(1, n_summits
):
186 prev_summit_pos
= valid_summits
[n_valid_summits
-1]
187 summit_pos
= summits
[i
]
188 prev_v
= signal
[prev_summit_pos
]
189 v
= signal
[summit_pos
]
190 req_min
= min_valley
* min(prev_v
, v
)
191 if (signal
[prev_summit_pos
:summit_pos
] < req_min
).any
():
192 valid_summits
[n_valid_summits
] = summit_pos
195 valid_summits
[n_valid_summits
-1] = summit_pos
196 valid_summits
.resize
(n_valid_summits
, refcheck
=False
)
197 # Step 2: Re-find peaks from subtracted signal
201 # Modified from http://www.scipy.org/Cookbook/SavitzkyGolay
202 # positive window_size not enforced anymore
203 # needs sane input paramters, window size > 4
204 # switched to double precision for internal accuracy
205 cpdef np
.ndarray
[np
.float64_t
, ndim
=1] savitzky_golay_order2_deriv1
(np
.ndarray
[np
.float32_t
, ndim
=1] signal
,
207 """Smooth (and optionally differentiate) data with a Savitzky-Golay filter.
208 The Savitzky-Golay filter removes high frequency noise from data.
209 It has the advantage of preserving the original shape and
210 features of the signal better than other types of filtering
211 approaches, such as moving averages techhniques.
214 y : array_like, shape (N,)
215 the values of the time history of the signal.
217 the length of the window. Must be an odd integer number.
219 the order of the derivative to compute (default = 0 means only smoothing)
222 ys : ndarray, shape (N)
223 the smoothed signal (or it's n-th derivative).
226 The Savitzky-Golay is a type of low-pass filter, particularly
227 suited for smoothing noisy data. The main idea behind this
228 approach is to make for each point a least-square fit with a
229 polynomial of high order over a odd-sized window centered at
234 .. [1] A. Savitzky, M. J. E. Golay, Smoothing and Differentiation of
235 Data by Simplified Least Squares Procedures. Analytical
236 Chemistry, 1964, 36 (8), pp 1627-1639.
237 .. [2] Numerical Recipes 3rd Edition: The Art of Scientific Computing
238 W.H. Press, S.A. Teukolsky, W.T. Vetterling, B.P. Flannery
239 Cambridge University Press ISBN-13: 9780521880688
243 np
.ndarray
[np
.int64_t
, ndim
=2] b
244 # pad the signal at the extremes with
245 # values taken from the signal itself
246 np
.ndarray
[np
.float32_t
, ndim
=1] firstvals
, lastvals
247 np
.ndarray
[np
.float64_t
, ndim
=1] m
, ret
249 if window_size
% 2 != 1: window_size
+= 1
250 half_window
= (window_size
- 1) // 2
251 # precompute coefficients
252 b
= np
.array
([[1, k
, k
**2] for k
in range(-half_window
, half_window
+1)],
254 m
= np
.linalg
.pinv
(b
)[1]
255 # pad the signal at the extremes with
256 # values taken from the signal itself
257 firstvals
= signal
[0] - np
.abs(signal
[1:half_window
+1][::-1] - signal
[0])
258 lastvals
= signal
[-1] + np
.abs(signal
[-half_window
-1:-1][::-1] - signal
[-1])
259 signal
= np
.concatenate
((firstvals
, signal
, lastvals
))
261 ret
= np
.convolve
( m
[::-1], signal
.astype
("float64"), mode
='valid') #.astype("float32").round(8) # round to 8 decimals to avoid signing issue
262 #print (ret[160:165])
266 # Another modified version from http://www.scipy.org/Cookbook/SavitzkyGolay
267 cpdef np
.ndarray
[np
.float32_t
, ndim
=1] savitzky_golay
( np
.ndarray
[np
.float32_t
, ndim
=1] y
, int window_size
,
268 int order
, int deriv
= 0, int rate
= 1 ):
269 """Smooth (and optionally differentiate) data with a Savitzky-Golay filter.
270 The Savitzky-Golay filter removes high frequency noise from data.
271 It has the advantage of preserving the original shape and
272 features of the signal better than other types of filtering
273 approaches, such as moving averages techniques.
276 y : array_like, shape (N,)
277 the values of the time history of the signal.
279 the length of the window. Must be an odd integer number.
281 the order of the polynomial used in the filtering.
282 Must be less then `window_size` - 1.
284 the order of the derivative to compute (default = 0 means only smoothing)
287 ys : ndarray, shape (N)
288 the smoothed signal (or it's n-th derivative).
291 The Savitzky-Golay is a type of low-pass filter, particularly
292 suited for smoothing noisy data. The main idea behind this
293 approach is to make for each point a least-square fit with a
294 polynomial of high order over a odd-sized window centered at
298 t = np.linspace(-4, 4, 500)
299 y = np.exp( -t**2 ) + np.random.normal(0, 0.05, t.shape)
300 ysg = savitzky_golay(y, window_size=31, order=4)
301 import matplotlib.pyplot as plt
302 plt.plot(t, y, label='Noisy signal')
303 plt.plot(t, np.exp(-t**2), 'k', lw=1.5, label='Original signal')
304 plt.plot(t, ysg, 'r', label='Filtered signal')
309 .. [1] A. Savitzky, M. J. E. Golay, Smoothing and Differentiation of
310 Data by Simplified Least Squares Procedures. Analytical
311 Chemistry, 1964, 36 (8), pp 1627-1639.
312 .. [2] Numerical Recipes 3rd Edition: The Art of Scientific Computing
313 W.H. Press, S.A. Teukolsky, W.T. Vetterling, B.P. Flannery
314 Cambridge University Press ISBN-13: 9780521880688
318 np
.ndarray
[np
.int64_t
, ndim
=2] b
319 # pad the signal at the extremes with
320 # values taken from the signal itself
321 np
.ndarray
[np
.float32_t
, ndim
=1] firstvals
, lastvals
, ret
322 np
.ndarray
[np
.float64_t
, ndim
=1] m
325 window_size
= np
.abs( np
.int( window_size
) )
326 order
= np
.abs( np
.int( order
) )
327 except ValueError, msg
:
328 raise ValueError("window_size and order have to be of type int")
329 if window_size
% 2 != 1 or window_size
< 1:
330 raise TypeError("window_size size must be a positive odd number")
331 if window_size
< order
+ 2:
332 raise TypeError("window_size is too small for the polynomials order")
333 half_window
= ( window_size
-1 ) // 2
334 # precompute coefficients
335 b
= np
.array
( [ [ k
**i
for i
in range( order
+ 1 ) ] for k
in range( -half_window
, half_window
+1 ) ] )
336 m
= np
.linalg
.pinv
( b
)[ deriv
] * rate
**deriv
* mathfactorial
( deriv
)
337 # pad the signal at the extremes with
338 # values taken from the signal itself
339 firstvals
= y
[ 0 ] - np
.abs( y
[ 1:half_window
+ 1 ][ ::-1 ] - y
[ 0 ] )
340 lastvals
= y
[ -1 ] + np
.abs( y
[ -half_window
- 1:-1 ][ ::-1 ] - y
[ -1 ])
341 y
= np
.concatenate
( ( firstvals
, y
, lastvals
) )
342 ret
= np
.convolve
( m
[ ::-1 ], y
, mode
= 'valid' ).astype
("float32")