2 """Module Description: Test functions for pileup functions.
4 This code is free software; you can redistribute it and/or modify it
5 under the terms of the BSD License (see the file LICENSE included with
12 from MACS3
.Signal
.Pileup
import *
13 from MACS3
.Signal
.PileupV2
import pileup_from_LR
, pileup_from_PN
15 # ------------------------------------
17 # ------------------------------------
19 class Test_SE_Pileup(unittest
.TestCase
):
20 """Unittest for pileup functions in Pileup.pyx for single-end
23 Function to test: se_all_in_one_pileup
27 self
.plus_pos
= np
.array(( 0, 1, 3 ), dtype
="int32")
28 self
.minus_pos
= np
.array(( 8, 9, 10 ), dtype
="int32")
29 self
.rlength
= 100 # right end of coordinates
30 # expected result from pileup_bdg_se: ( start, end, value )
31 # the actual fragment length is 1+five_shift+three_shift
32 self
.param_1
= { "five_shift": 0,
36 self
.expect_pileup_1
= \
44 # expected result from pileup_w_multiple_d_bdg_se: ( start, end, value )
45 self
.param_2
= { "five_shift": 0,
49 self
.expect_pileup_2
= \
58 def test_pileup_1(self
):
59 pileup
= se_all_in_one_pileup( self
.plus_pos
, self
.minus_pos
,
60 self
.param_1
["five_shift"],
61 self
.param_1
["three_shift"],
63 self
.param_1
["scale_factor"],
64 self
.param_1
["baseline"] )
67 pnext
= iter(p
).__next
__
68 vnext
= iter(v
).__next
__
70 for i
in range(len(p
)):
73 result
.append( (pre
,pos
,value
) )
76 self
.assertEqual( result
, self
.expect_pileup_1
)
78 def test_pileup_2(self
):
79 pileup
= se_all_in_one_pileup( self
.plus_pos
, self
.minus_pos
,
80 self
.param_2
["five_shift"],
81 self
.param_2
["three_shift"],
83 self
.param_2
["scale_factor"],
84 self
.param_2
["baseline"] )
87 pnext
= iter(p
).__next
__
88 vnext
= iter(v
).__next
__
90 for i
in range(len(p
)):
93 result
.append( (pre
,pos
,value
) )
96 self
.assertEqual( result
, self
.expect_pileup_2
)
98 class Test_Quick_Pileup(unittest
.TestCase
):
99 """Unittest for pileup functions in Pileup.pyx for quick-pileup.
101 Function to test: quick_pileup
105 self
.start_pos
= np
.array( ( 0, 1, 3, 3, 4, 5 ), dtype
="int32")
106 self
.end_pos
= np
.array( ( 5, 6, 8, 8, 9, 10 ), dtype
="int32")
107 # expected result from pileup_bdg_se: ( start, end, value )
108 self
.param_1
= { "scale_factor": 0.5,
110 self
.expect_pileup_1
= \
119 def test_pileup_1(self
):
120 pileup
= quick_pileup ( self
.start_pos
, self
.end_pos
,
121 self
.param_1
["scale_factor"],
122 self
.param_1
["baseline"] )
125 pnext
= iter(p
).__next
__
126 vnext
= iter(v
).__next
__
128 for i
in range(len(p
)):
131 result
.append( (pre
,pos
,value
) )
134 self
.assertEqual( result
, self
.expect_pileup_1
)
136 class Test_Naive_Pileup(unittest
.TestCase
):
137 """Unittest for pileup functions in Pileup.pyx for naive-quick-pileup.
139 Function to test: naive_quick_pileup
143 self
.pos
= np
.array( ( 2, 3, 5, 5, 6, 7 ), dtype
="int32")
144 # expected result from pileup_bdg_se: ( start, end, value )
145 self
.param_1
= { "extension": 2 }
146 self
.expect_pileup_1
= \
153 def test_pileup_1(self
):
154 pileup
= naive_quick_pileup ( self
.pos
,
155 self
.param_1
["extension"] )
159 pnext
= iter(p
).__next
__
160 vnext
= iter(v
).__next
__
162 for i
in range(len(p
)):
165 result
.append( (pre
,pos
,value
) )
168 self
.assertEqual( result
, self
.expect_pileup_1
)
170 class Test_Over_Two_PV_Array(unittest
.TestCase
):
171 """Unittest for over_two_pv_array function
173 Function to test: over_two_pv_array
177 self
.pv1
= [ np
.array( ( 2, 5, 7, 8, 9, 12 ), dtype
="int32" ),\
178 np
.array( ( 1, 2, 3, 4, 3, 2), dtype
="float32" ) ]
179 self
.pv2
= [ np
.array( ( 1, 4, 6, 8, 10, 11 ), dtype
="int32" ),\
180 np
.array( ( 5, 3, 2, 1, 0, 3), dtype
="float32" ) ]
181 # expected result from pileup_bdg_se: ( start, end, value )
182 self
.expect_pv_max
= \
193 self
.expect_pv_min
= \
204 self
.expect_pv_mean
= \
217 pileup
= over_two_pv_array ( self
.pv1
, self
.pv2
, func
="max" )
221 pnext
= iter(p
).__next
__
222 vnext
= iter(v
).__next
__
224 for i
in range(len(p
)):
227 result
.append( (pre
,pos
,value
) )
230 self
.assertEqual( result
, self
.expect_pv_max
)
233 pileup
= over_two_pv_array ( self
.pv1
, self
.pv2
, func
="min" )
237 pnext
= iter(p
).__next
__
238 vnext
= iter(v
).__next
__
240 for i
in range(len(p
)):
243 result
.append( (pre
,pos
,value
) )
246 self
.assertEqual( result
, self
.expect_pv_min
)
249 pileup
= over_two_pv_array ( self
.pv1
, self
.pv2
, func
="mean" )
253 pnext
= iter(p
).__next
__
254 vnext
= iter(v
).__next
__
256 for i
in range(len(p
)):
259 result
.append( (pre
,pos
,value
) )
262 self
.assertEqual( result
, self
.expect_pv_mean
)
264 class Test_PileupV2_PE(unittest
.TestCase
):
265 """Unittest for pileup functions in PileupV2.pyx.
267 Function to test: pileup_from_LR
271 self
.LR_array1
= np
.array( [ (1,5), (2,6), (4,8), (4,8), (5,9), (6,10),
272 (12,14), (13,17), (14,18), (17,19) ], dtype
= [('l','int32'),('r','int32')])
273 # expected result from pileup_from_LR: ( end, value )
274 self
.expect_pileup_1
= np
.array( [ ( 1, 0.0 ),
284 dtype
=[ ( 'p', 'uint32' ), ( 'v', 'float32' ) ] )
285 # with log2(length) as weight
286 self
.expect_pileup_2
= np
.array( [ ( 1, 0.0 ),
298 dtype
=[ ( 'p', 'uint32' ), ( 'v', 'float32' ) ] )
300 def test_pileup_1(self
):
301 pileup
= pileup_from_LR( self
.LR_array1
)
302 np
.testing
.assert_equal( pileup
, self
.expect_pileup_1
)
304 def test_pileup_2(self
):
305 pileup
= pileup_from_LR( self
.LR_array1
, lambda x
,y
: log2(y
-x
) )
306 np
.testing
.assert_equal( pileup
, self
.expect_pileup_2
)
308 class Test_PileupV2_SE(unittest
.TestCase
):
309 """Unittest for pileup functions in PileupV2.pyx.
311 Function to test: pileup_from_PN
315 self
.P
= np
.array( ( 0, 1, 3, 3, 4, 5 ), dtype
="int32") #plus strand pos
316 self
.N
= np
.array( ( 5, 6, 8, 8, 9, 10 ), dtype
="int32") #minus strand pos
317 # expected result from pileup_bdg_se: ( start, end, value )
319 self
.expect_pileup_1
= np
.array( [ ( 1, 1.0 ),
327 dtype
=[ ( 'p', 'uint32' ), ( 'v', 'float32' ) ] )\
329 def test_pileup_1(self
):
330 pileup
= pileup_from_PN( self
.P
, self
.N
, self
.extsize
)
331 np
.testing
.assert_equal( pileup
, self
.expect_pileup_1
)