Update setup.py
[MACS.git] / test / test_Pileup.py
blob6972788fd154b38750d0df64ea94cf60ab996d3c
1 #!/usr/bin/env python
2 """Module Description: Test functions for pileup functions.
4 This code is free software; you can redistribute it and/or modify it
5 under the terms of the BSD License (see the file LICENSE included with
6 the distribution).
7 """
9 import unittest
10 import numpy as np
11 from math import log10
12 from MACS3.Signal.Pileup import *
14 # ------------------------------------
15 # Main function
16 # ------------------------------------
18 class Test_SE_Pileup(unittest.TestCase):
19 """Unittest for pileup functions in Pileup.pyx for single-end
20 datasets.
22 Function to test: se_all_in_one_pileup
24 """
25 def setUp(self):
26 self.plus_pos = np.array(( 0, 1, 3 ), dtype="int32")
27 self.minus_pos = np.array(( 8, 9, 10 ), dtype="int32")
28 self.rlength = 100 # right end of coordinates
29 # expected result from pileup_bdg_se: ( start, end, value )
30 # the actual fragment length is 1+five_shift+three_shift
31 self.param_1 = { "five_shift": 0,
32 "three_shift": 5,
33 "scale_factor": 0.5,
34 "baseline": 0}
35 self.expect_pileup_1 = \
36 [ ( 0, 1, 0.5 ),
37 ( 1, 3, 1.0 ),
38 ( 3, 4, 2.0 ),
39 ( 4, 6, 2.5 ),
40 ( 6, 8, 2.0 ),
41 ( 8, 9, 1.0 ),
42 ( 9, 10, 0.5 ) ]
43 # expected result from pileup_w_multiple_d_bdg_se: ( start, end, value )
44 self.param_2 = { "five_shift": 0,
45 "three_shift": 10,
46 "scale_factor": 2,
47 "baseline": 8}
48 self.expect_pileup_2 = \
49 [ ( 0, 1, 8.0 ),
50 ( 1, 3, 10.0 ),
51 ( 3, 8, 12.0 ),
52 ( 8, 9, 10.0 ),
53 ( 9, 10, 8.0 ),
54 ( 10, 11, 8.0 ),
55 ( 11, 13, 8.0 ) ]
57 def test_pileup_1(self):
58 pileup = se_all_in_one_pileup( self.plus_pos, self.minus_pos,
59 self.param_1["five_shift"],
60 self.param_1["three_shift"],
61 self.rlength,
62 self.param_1["scale_factor"],
63 self.param_1["baseline"] )
64 result = []
65 (p,v) = pileup
66 pnext = iter(p).__next__
67 vnext = iter(v).__next__
68 pre = 0
69 for i in range(len(p)):
70 pos = pnext()
71 value = vnext()
72 result.append( (pre,pos,value) )
73 pre = pos
74 # check result
75 self.assertEqual( result, self.expect_pileup_1 )
77 def test_pileup_2(self):
78 pileup = se_all_in_one_pileup( self.plus_pos, self.minus_pos,
79 self.param_2["five_shift"],
80 self.param_2["three_shift"],
81 self.rlength,
82 self.param_2["scale_factor"],
83 self.param_2["baseline"] )
84 result = []
85 (p,v) = pileup
86 pnext = iter(p).__next__
87 vnext = iter(v).__next__
88 pre = 0
89 for i in range(len(p)):
90 pos = pnext()
91 value = vnext()
92 result.append( (pre,pos,value) )
93 pre = pos
94 # check result
95 self.assertEqual( result, self.expect_pileup_2 )
97 class Test_Quick_Pileup(unittest.TestCase):
98 """Unittest for pileup functions in Pileup.pyx for quick-pileup.
100 Function to test: quick_pileup
103 def setUp(self):
104 self.start_pos = np.array( ( 0, 1, 3, 3, 4, 5 ), dtype="int32")
105 self.end_pos = np.array( ( 5, 6, 8, 8, 9, 10 ), dtype="int32")
106 # expected result from pileup_bdg_se: ( start, end, value )
107 self.param_1 = { "scale_factor": 0.5,
108 "baseline": 0}
109 self.expect_pileup_1 = \
110 [ ( 0, 1, 0.5 ),
111 ( 1, 3, 1.0 ),
112 ( 3, 4, 2.0 ),
113 ( 4, 6, 2.5 ),
114 ( 6, 8, 2.0 ),
115 ( 8, 9, 1.0 ),
116 ( 9, 10, 0.5 ) ]
118 def test_pileup_1(self):
119 pileup = quick_pileup ( self.start_pos, self.end_pos,
120 self.param_1["scale_factor"],
121 self.param_1["baseline"] )
122 result = []
123 (p,v) = pileup
124 pnext = iter(p).__next__
125 vnext = iter(v).__next__
126 pre = 0
127 for i in range(len(p)):
128 pos = pnext()
129 value = vnext()
130 result.append( (pre,pos,value) )
131 pre = pos
132 # check result
133 self.assertEqual( result, self.expect_pileup_1 )
135 class Test_Naive_Pileup(unittest.TestCase):
136 """Unittest for pileup functions in Pileup.pyx for naive-quick-pileup.
138 Function to test: naive_quick_pileup
141 def setUp(self):
142 self.pos = np.array( ( 2, 3, 5, 5, 6, 7 ), dtype="int32")
143 # expected result from pileup_bdg_se: ( start, end, value )
144 self.param_1 = { "extension": 2 }
145 self.expect_pileup_1 = \
146 [ ( 0, 1, 1.0 ),
147 ( 1, 3, 2.0 ),
148 ( 3, 7, 4.0 ),
149 ( 7, 8, 2.0 ),
150 ( 8, 9, 1.0 ) ]
152 def test_pileup_1(self):
153 pileup = naive_quick_pileup ( self.pos,
154 self.param_1["extension"] )
155 result = []
156 (p,v) = pileup
157 print(p, v)
158 pnext = iter(p).__next__
159 vnext = iter(v).__next__
160 pre = 0
161 for i in range(len(p)):
162 pos = pnext()
163 value = vnext()
164 result.append( (pre,pos,value) )
165 pre = pos
166 #check result
167 self.assertEqual( result, self.expect_pileup_1 )
169 class Test_Over_Two_PV_Array(unittest.TestCase):
170 """Unittest for over_two_pv_array function
172 Function to test: over_two_pv_array
175 def setUp(self):
176 self.pv1 = [ np.array( ( 2, 5, 7, 8, 9, 12 ), dtype="int32" ),\
177 np.array( ( 1, 2, 3, 4, 3, 2), dtype="float32" ) ]
178 self.pv2 = [ np.array( ( 1, 4, 6, 8, 10, 11 ), dtype="int32" ),\
179 np.array( ( 5, 3, 2, 1, 0, 3), dtype="float32" ) ]
180 # expected result from pileup_bdg_se: ( start, end, value )
181 self.expect_pv_max = \
182 [ ( 0, 1, 5.0 ),
183 ( 1, 2, 3.0 ),
184 ( 2, 4, 3.0 ),
185 ( 4, 5, 2.0 ),
186 ( 5, 6, 3.0 ),
187 ( 6, 7, 3.0 ),
188 ( 7, 8, 4.0 ),
189 ( 8, 9, 3.0 ),
190 ( 9, 10, 2.0 ),
191 (10, 11, 3.0 ) ]
192 self.expect_pv_min = \
193 [ ( 0, 1, 1.0 ),
194 ( 1, 2, 1.0 ),
195 ( 2, 4, 2.0 ),
196 ( 4, 5, 2.0 ),
197 ( 5, 6, 2.0 ),
198 ( 6, 7, 1.0 ),
199 ( 7, 8, 1.0 ),
200 ( 8, 9, 0.0 ),
201 ( 9, 10, 0.0 ),
202 (10, 11, 2.0 ) ]
203 self.expect_pv_mean = \
204 [ ( 0, 1, 3.0 ),
205 ( 1, 2, 2.0 ),
206 ( 2, 4, 2.5 ),
207 ( 4, 5, 2.0 ),
208 ( 5, 6, 2.5 ),
209 ( 6, 7, 2.0 ),
210 ( 7, 8, 2.5 ),
211 ( 8, 9, 1.5 ),
212 ( 9, 10, 1.0 ),
213 (10, 11, 2.5 ) ]
215 def test_max(self):
216 pileup = over_two_pv_array ( self.pv1, self.pv2, func="max" )
217 result = []
218 (p,v) = pileup
219 print(p, v)
220 pnext = iter(p).__next__
221 vnext = iter(v).__next__
222 pre = 0
223 for i in range(len(p)):
224 pos = pnext()
225 value = vnext()
226 result.append( (pre,pos,value) )
227 pre = pos
228 #check result
229 self.assertEqual( result, self.expect_pv_max )
231 def test_min(self):
232 pileup = over_two_pv_array ( self.pv1, self.pv2, func="min" )
233 result = []
234 (p,v) = pileup
235 print(p, v)
236 pnext = iter(p).__next__
237 vnext = iter(v).__next__
238 pre = 0
239 for i in range(len(p)):
240 pos = pnext()
241 value = vnext()
242 result.append( (pre,pos,value) )
243 pre = pos
244 #check result
245 self.assertEqual( result, self.expect_pv_min )
247 def test_mean(self):
248 pileup = over_two_pv_array ( self.pv1, self.pv2, func="mean" )
249 result = []
250 (p,v) = pileup
251 print(p, v)
252 pnext = iter(p).__next__
253 vnext = iter(v).__next__
254 pre = 0
255 for i in range(len(p)):
256 pos = pnext()
257 value = vnext()
258 result.append( (pre,pos,value) )
259 pre = pos
260 #check result
261 self.assertEqual( result, self.expect_pv_mean )