update version. tweak the test (install cython 3.0 for non-x64)
[MACS.git] / test / test_Pileup.py
blob6abefa48d873843353d9fac2e67c592347cd9e9c
1 #!/usr/bin/env python
2 """Module Description: Test functions for pileup functions.
4 This code is free software; you can redistribute it and/or modify it
5 under the terms of the BSD License (see the file LICENSE included with
6 the distribution).
7 """
9 import unittest
10 import numpy as np
11 from math import log2
12 from MACS3.Signal.Pileup import *
13 from MACS3.Signal.PileupV2 import pileup_from_LR, pileup_from_PN
15 # ------------------------------------
16 # Main function
17 # ------------------------------------
19 class Test_SE_Pileup(unittest.TestCase):
20 """Unittest for pileup functions in Pileup.pyx for single-end
21 datasets.
23 Function to test: se_all_in_one_pileup
25 """
26 def setUp(self):
27 self.plus_pos = np.array(( 0, 1, 3 ), dtype="int32")
28 self.minus_pos = np.array(( 8, 9, 10 ), dtype="int32")
29 self.rlength = 100 # right end of coordinates
30 # expected result from pileup_bdg_se: ( start, end, value )
31 # the actual fragment length is 1+five_shift+three_shift
32 self.param_1 = { "five_shift": 0,
33 "three_shift": 5,
34 "scale_factor": 0.5,
35 "baseline": 0}
36 self.expect_pileup_1 = \
37 [ ( 0, 1, 0.5 ),
38 ( 1, 3, 1.0 ),
39 ( 3, 4, 2.0 ),
40 ( 4, 6, 2.5 ),
41 ( 6, 8, 2.0 ),
42 ( 8, 9, 1.0 ),
43 ( 9, 10, 0.5 ) ]
44 # expected result from pileup_w_multiple_d_bdg_se: ( start, end, value )
45 self.param_2 = { "five_shift": 0,
46 "three_shift": 10,
47 "scale_factor": 2,
48 "baseline": 8}
49 self.expect_pileup_2 = \
50 [ ( 0, 1, 8.0 ),
51 ( 1, 3, 10.0 ),
52 ( 3, 8, 12.0 ),
53 ( 8, 9, 10.0 ),
54 ( 9, 10, 8.0 ),
55 ( 10, 11, 8.0 ),
56 ( 11, 13, 8.0 ) ]
58 def test_pileup_1(self):
59 pileup = se_all_in_one_pileup( self.plus_pos, self.minus_pos,
60 self.param_1["five_shift"],
61 self.param_1["three_shift"],
62 self.rlength,
63 self.param_1["scale_factor"],
64 self.param_1["baseline"] )
65 result = []
66 (p,v) = pileup
67 pnext = iter(p).__next__
68 vnext = iter(v).__next__
69 pre = 0
70 for i in range(len(p)):
71 pos = pnext()
72 value = vnext()
73 result.append( (pre,pos,value) )
74 pre = pos
75 # check result
76 self.assertEqual( result, self.expect_pileup_1 )
78 def test_pileup_2(self):
79 pileup = se_all_in_one_pileup( self.plus_pos, self.minus_pos,
80 self.param_2["five_shift"],
81 self.param_2["three_shift"],
82 self.rlength,
83 self.param_2["scale_factor"],
84 self.param_2["baseline"] )
85 result = []
86 (p,v) = pileup
87 pnext = iter(p).__next__
88 vnext = iter(v).__next__
89 pre = 0
90 for i in range(len(p)):
91 pos = pnext()
92 value = vnext()
93 result.append( (pre,pos,value) )
94 pre = pos
95 # check result
96 self.assertEqual( result, self.expect_pileup_2 )
98 class Test_Quick_Pileup(unittest.TestCase):
99 """Unittest for pileup functions in Pileup.pyx for quick-pileup.
101 Function to test: quick_pileup
104 def setUp(self):
105 self.start_pos = np.array( ( 0, 1, 3, 3, 4, 5 ), dtype="int32")
106 self.end_pos = np.array( ( 5, 6, 8, 8, 9, 10 ), dtype="int32")
107 # expected result from pileup_bdg_se: ( start, end, value )
108 self.param_1 = { "scale_factor": 0.5,
109 "baseline": 0}
110 self.expect_pileup_1 = \
111 [ ( 0, 1, 0.5 ),
112 ( 1, 3, 1.0 ),
113 ( 3, 4, 2.0 ),
114 ( 4, 6, 2.5 ),
115 ( 6, 8, 2.0 ),
116 ( 8, 9, 1.0 ),
117 ( 9, 10, 0.5 ) ]
119 def test_pileup_1(self):
120 pileup = quick_pileup ( self.start_pos, self.end_pos,
121 self.param_1["scale_factor"],
122 self.param_1["baseline"] )
123 result = []
124 (p,v) = pileup
125 pnext = iter(p).__next__
126 vnext = iter(v).__next__
127 pre = 0
128 for i in range(len(p)):
129 pos = pnext()
130 value = vnext()
131 result.append( (pre,pos,value) )
132 pre = pos
133 # check result
134 self.assertEqual( result, self.expect_pileup_1 )
136 class Test_Naive_Pileup(unittest.TestCase):
137 """Unittest for pileup functions in Pileup.pyx for naive-quick-pileup.
139 Function to test: naive_quick_pileup
142 def setUp(self):
143 self.pos = np.array( ( 2, 3, 5, 5, 6, 7 ), dtype="int32")
144 # expected result from pileup_bdg_se: ( start, end, value )
145 self.param_1 = { "extension": 2 }
146 self.expect_pileup_1 = \
147 [ ( 0, 1, 1.0 ),
148 ( 1, 3, 2.0 ),
149 ( 3, 7, 4.0 ),
150 ( 7, 8, 2.0 ),
151 ( 8, 9, 1.0 ) ]
153 def test_pileup_1(self):
154 pileup = naive_quick_pileup ( self.pos,
155 self.param_1["extension"] )
156 result = []
157 (p,v) = pileup
158 print(p, v)
159 pnext = iter(p).__next__
160 vnext = iter(v).__next__
161 pre = 0
162 for i in range(len(p)):
163 pos = pnext()
164 value = vnext()
165 result.append( (pre,pos,value) )
166 pre = pos
167 #check result
168 self.assertEqual( result, self.expect_pileup_1 )
170 class Test_Over_Two_PV_Array(unittest.TestCase):
171 """Unittest for over_two_pv_array function
173 Function to test: over_two_pv_array
176 def setUp(self):
177 self.pv1 = [ np.array( ( 2, 5, 7, 8, 9, 12 ), dtype="int32" ),\
178 np.array( ( 1, 2, 3, 4, 3, 2), dtype="float32" ) ]
179 self.pv2 = [ np.array( ( 1, 4, 6, 8, 10, 11 ), dtype="int32" ),\
180 np.array( ( 5, 3, 2, 1, 0, 3), dtype="float32" ) ]
181 # expected result from pileup_bdg_se: ( start, end, value )
182 self.expect_pv_max = \
183 [ ( 0, 1, 5.0 ),
184 ( 1, 2, 3.0 ),
185 ( 2, 4, 3.0 ),
186 ( 4, 5, 2.0 ),
187 ( 5, 6, 3.0 ),
188 ( 6, 7, 3.0 ),
189 ( 7, 8, 4.0 ),
190 ( 8, 9, 3.0 ),
191 ( 9, 10, 2.0 ),
192 (10, 11, 3.0 ) ]
193 self.expect_pv_min = \
194 [ ( 0, 1, 1.0 ),
195 ( 1, 2, 1.0 ),
196 ( 2, 4, 2.0 ),
197 ( 4, 5, 2.0 ),
198 ( 5, 6, 2.0 ),
199 ( 6, 7, 1.0 ),
200 ( 7, 8, 1.0 ),
201 ( 8, 9, 0.0 ),
202 ( 9, 10, 0.0 ),
203 (10, 11, 2.0 ) ]
204 self.expect_pv_mean = \
205 [ ( 0, 1, 3.0 ),
206 ( 1, 2, 2.0 ),
207 ( 2, 4, 2.5 ),
208 ( 4, 5, 2.0 ),
209 ( 5, 6, 2.5 ),
210 ( 6, 7, 2.0 ),
211 ( 7, 8, 2.5 ),
212 ( 8, 9, 1.5 ),
213 ( 9, 10, 1.0 ),
214 (10, 11, 2.5 ) ]
216 def test_max(self):
217 pileup = over_two_pv_array ( self.pv1, self.pv2, func="max" )
218 result = []
219 (p,v) = pileup
220 print(p, v)
221 pnext = iter(p).__next__
222 vnext = iter(v).__next__
223 pre = 0
224 for i in range(len(p)):
225 pos = pnext()
226 value = vnext()
227 result.append( (pre,pos,value) )
228 pre = pos
229 #check result
230 self.assertEqual( result, self.expect_pv_max )
232 def test_min(self):
233 pileup = over_two_pv_array ( self.pv1, self.pv2, func="min" )
234 result = []
235 (p,v) = pileup
236 print(p, v)
237 pnext = iter(p).__next__
238 vnext = iter(v).__next__
239 pre = 0
240 for i in range(len(p)):
241 pos = pnext()
242 value = vnext()
243 result.append( (pre,pos,value) )
244 pre = pos
245 #check result
246 self.assertEqual( result, self.expect_pv_min )
248 def test_mean(self):
249 pileup = over_two_pv_array ( self.pv1, self.pv2, func="mean" )
250 result = []
251 (p,v) = pileup
252 print(p, v)
253 pnext = iter(p).__next__
254 vnext = iter(v).__next__
255 pre = 0
256 for i in range(len(p)):
257 pos = pnext()
258 value = vnext()
259 result.append( (pre,pos,value) )
260 pre = pos
261 #check result
262 self.assertEqual( result, self.expect_pv_mean )
264 class Test_PileupV2_PE(unittest.TestCase):
265 """Unittest for pileup functions in PileupV2.pyx.
267 Function to test: pileup_from_LR
270 def setUp(self):
271 self.LR_array1 = np.array( [ (1,5), (2,6), (4,8), (4,8), (5,9), (6,10),
272 (12,14), (13,17), (14,18), (17,19) ], dtype= [('l','int32'),('r','int32')])
273 # expected result from pileup_from_LR: ( end, value )
274 self.expect_pileup_1 = np.array( [ ( 1, 0.0 ),
275 ( 2, 1.0 ),
276 ( 4, 2.0 ),
277 ( 8, 4.0 ),
278 ( 9, 2.0 ),
279 ( 10, 1.0 ),
280 ( 12, 0.0 ),
281 ( 13, 1.0 ),
282 ( 18, 2.0 ),
283 ( 19, 1.0 ) ],
284 dtype=[ ( 'p', 'uint32' ), ( 'v', 'float32' ) ] )
285 # with log2(length) as weight
286 self.expect_pileup_2 = np.array( [ ( 1, 0.0 ),
287 ( 2, 2.0 ),
288 ( 4, 4.0 ),
289 ( 8, 8.0 ),
290 ( 9, 4.0 ),
291 ( 10, 2.0 ),
292 ( 12, 0.0 ),
293 ( 13, 1.0 ),
294 ( 14, 3.0 ),
295 ( 17, 4.0 ),
296 ( 18, 3.0 ),
297 ( 19, 1.0 ) ],
298 dtype=[ ( 'p', 'uint32' ), ( 'v', 'float32' ) ] )
300 def test_pileup_1(self):
301 pileup = pileup_from_LR( self.LR_array1 )
302 np.testing.assert_equal( pileup, self.expect_pileup_1 )
304 def test_pileup_2(self):
305 pileup = pileup_from_LR( self.LR_array1, lambda x,y: log2(y-x) )
306 np.testing.assert_equal( pileup, self.expect_pileup_2 )
308 class Test_PileupV2_SE(unittest.TestCase):
309 """Unittest for pileup functions in PileupV2.pyx.
311 Function to test: pileup_from_PN
314 def setUp(self):
315 self.P = np.array( ( 0, 1, 3, 3, 4, 5 ), dtype="int32") #plus strand pos
316 self.N = np.array( ( 5, 6, 8, 8, 9, 10 ), dtype="int32") #minus strand pos
317 # expected result from pileup_bdg_se: ( start, end, value )
318 self.extsize = 2
319 self.expect_pileup_1 = np.array( [ ( 1, 1.0 ),
320 ( 2, 2.0 ),
321 ( 3, 1.0 ),
322 ( 4, 3.0 ),
323 ( 5, 5.0 ),
324 ( 8, 3.0 ),
325 ( 9, 2.0 ),
326 ( 10, 1.0 ) ],
327 dtype=[ ( 'p', 'uint32' ), ( 'v', 'float32' ) ] )\
329 def test_pileup_1(self):
330 pileup = pileup_from_PN( self.P, self.N, self.extsize )
331 np.testing.assert_equal( pileup, self.expect_pileup_1 )