2 # Time-stamp: <2024-10-18 15:30:21 Tao Liu>
6 from numpy
.testing
import assert_array_equal
# assert_equal, assert_almost_equal
9 from MACS3
.Signal
.ScoreTrack
import ScoreTrackII
, TwoConditionScores
10 from MACS3
.Signal
.BedGraph
import bedGraphTrackI
13 class Test_TwoConditionScores(unittest
.TestCase
):
15 self
.t1bdg
= bedGraphTrackI()
16 self
.t2bdg
= bedGraphTrackI()
17 self
.c1bdg
= bedGraphTrackI()
18 self
.c2bdg
= bedGraphTrackI()
19 self
.test_regions1
= [(b
"chrY", 0, 70, 0.00, 0.01),
20 (b
"chrY", 70, 80, 7.00, 0.5),
21 (b
"chrY", 80, 150, 0.00, 0.02)]
22 self
.test_regions2
= [(b
"chrY", 0, 75, 20.0, 4.00),
23 (b
"chrY", 75, 90, 35.0, 6.00),
24 (b
"chrY", 90, 150, 10.0, 15.00)]
25 for a
in self
.test_regions1
:
26 self
.t1bdg
.safe_add_loc(a
[0], a
[1], a
[2], a
[3])
27 self
.c1bdg
.safe_add_loc(a
[0], a
[1], a
[2], a
[4])
29 for a
in self
.test_regions2
:
30 self
.t2bdg
.safe_add_loc(a
[0], a
[1], a
[2], a
[3])
31 self
.c2bdg
.safe_add_loc(a
[0], a
[1], a
[2], a
[4])
33 self
.twoconditionscore
= TwoConditionScores(self
.t1bdg
,
39 self
.twoconditionscore
.build()
40 self
.twoconditionscore
.finalize()
41 (self
.cat1
, self
.cat2
, self
.cat3
) = self
.twoconditionscore
.call_peaks(min_length
=10, max_gap
=10, cutoff
=3)
44 class Test_ScoreTrackII(unittest
.TestCase
):
47 # for initiate scoretrack
48 self
.test_regions1
= [(b
"chrY", 10, 100, 10),
49 (b
"chrY", 60, 10, 10),
50 (b
"chrY", 110, 15, 20),
51 (b
"chrY", 160, 5, 20),
52 (b
"chrY", 210, 20, 5)]
55 # for different scoring method
56 self
.p_result
= [60.49, 0.38, 0.08, 0.0, 6.41] # -log10(p-value), pseudo count 1 added
57 self
.q_result
= [58.17, 0.0, 0.0, 0.0, 5.13] # -log10(q-value) from BH, pseudo count 1 added
58 self
.l_result
= [58.17, 0.0, -0.28, -3.25, 4.91] # log10 likelihood ratio, pseudo count 1 added
59 self
.f_result
= [0.96, 0.00, -0.12, -0.54, 0.54] # note, pseudo count 1 would be introduced.
60 self
.d_result
= [90.00, 0, -5.00, -15.00, 15.00]
61 self
.m_result
= [10.00, 1.00, 1.50, 0.50, 2.00]
63 self
.norm_T
= np
.array([[ 10, 100, 20, 0],
67 [210, 20, 10, 0]]).transpose()
68 self
.norm_C
= np
.array([[ 10, 50, 10, 0],
72 [210, 10, 5, 0]]).transpose()
73 self
.norm_M
= np
.array([[ 10, 10, 2, 0],
77 [210, 2, 1, 0]]).transpose()
78 self
.norm_N
= np
.array([[ 10, 100, 10, 0], # note precision lost
82 [210, 20, 5, 0]]).transpose()
85 self
.bdg1
= """chrY 0 10 100.00000
91 self
.bdg2
= """chrY 0 60 10.00000
95 self
.bdg3
= """chrY 0 10 60.48912
102 self
.peak1
= """chrY 0 60 MACS_peak_1 60.4891
103 chrY 160 210 MACS_peak_2 6.40804
105 self
.summit1
= """chrY 5 6 MACS_peak_1 60.4891
106 chrY 185 186 MACS_peak_2 6.40804
108 self
.xls1
="""chr start end length abs_summit pileup -log10(pvalue) fold_enrichment -log10(qvalue) name
109 chrY 1 60 60 6 100 63.2725 9.18182 -1 MACS_peak_1
110 chrY 161 210 50 186 20 7.09102 3.5 -1 MACS_peak_2
113 def assertListAlmostEqual(self
, a
, b
, places
=2):
114 return all([self
.assertAlmostEqual(x
, y
, places
=places
) for (x
, y
) in zip(a
, b
)])
116 def test_compute_scores(self
):
117 s1
= ScoreTrackII(self
.treat_edm
, self
.ctrl_edm
)
118 s1
.add_chromosome(b
"chrY", 5)
119 for a
in self
.test_regions1
:
120 s1
.add(a
[0], a
[1], a
[2], a
[3])
122 s1
.set_pseudocount(1.0)
124 s1
.change_score_method(ord('p'))
125 r
= s1
.get_data_by_chr(b
"chrY")
126 self
.assertListAlmostEqual([round(x
, 2) for x
in r
[3]], self
.p_result
)
128 s1
.change_score_method(ord('q'))
129 r
= s1
.get_data_by_chr(b
"chrY")
130 self
.assertListAlmostEqual([round(x
, 2) for x
in list(r
[3])], self
.q_result
)
132 s1
.change_score_method(ord('l'))
133 r
= s1
.get_data_by_chr(b
"chrY")
134 self
.assertListAlmostEqual([round(x
, 2) for x
in list(r
[3])], self
.l_result
)
136 s1
.change_score_method(ord('f'))
137 r
= s1
.get_data_by_chr(b
"chrY")
138 self
.assertListAlmostEqual([round(x
, 2) for x
in list(r
[3])], self
.f_result
)
140 s1
.change_score_method(ord('d'))
141 r
= s1
.get_data_by_chr(b
"chrY")
142 self
.assertListAlmostEqual([round(x
, 2) for x
in list(r
[3])], self
.d_result
)
144 s1
.change_score_method(ord('m'))
145 r
= s1
.get_data_by_chr(b
"chrY")
146 self
.assertListAlmostEqual([round(x
, 2) for x
in list(r
[3])], self
.m_result
)
148 def test_normalize(self
):
149 s1
= ScoreTrackII(self
.treat_edm
, self
.ctrl_edm
)
150 s1
.add_chromosome(b
"chrY", 5)
151 for a
in self
.test_regions1
:
152 s1
.add(a
[0], a
[1], a
[2], a
[3])
154 s1
.change_normalization_method(ord('T'))
155 r
= s1
.get_data_by_chr(b
"chrY")
156 assert_array_equal(r
, self
.norm_T
)
158 s1
.change_normalization_method(ord('C'))
159 r
= s1
.get_data_by_chr(b
"chrY")
160 assert_array_equal(r
, self
.norm_C
)
162 s1
.change_normalization_method(ord('M'))
163 r
= s1
.get_data_by_chr(b
"chrY")
164 assert_array_equal(r
, self
.norm_M
)
166 s1
.change_normalization_method(ord('N'))
167 r
= s1
.get_data_by_chr(b
"chrY")
168 assert_array_equal(r
, self
.norm_N
)
170 def test_writebedgraph(self
):
171 s1
= ScoreTrackII(self
.treat_edm
, self
.ctrl_edm
)
172 s1
.add_chromosome(b
"chrY", 5)
173 for a
in self
.test_regions1
:
174 s1
.add(a
[0], a
[1], a
[2], a
[3])
176 s1
.change_score_method(ord('p'))
178 strio
= io
.StringIO()
179 s1
.write_bedGraph(strio
, "NAME", "DESC", 1)
180 self
.assertEqual(strio
.getvalue(), self
.bdg1
)
181 strio
= io
.StringIO()
182 s1
.write_bedGraph(strio
, "NAME", "DESC", 2)
183 self
.assertEqual(strio
.getvalue(), self
.bdg2
)
184 strio
= io
.StringIO()
185 s1
.write_bedGraph(strio
, "NAME", "DESC", 3)
186 self
.assertEqual(strio
.getvalue(), self
.bdg3
)
188 def test_callpeak(self
):
189 s1
= ScoreTrackII(self
.treat_edm
, self
.ctrl_edm
)
190 s1
.add_chromosome(b
"chrY", 5)
191 for a
in self
.test_regions1
:
192 s1
.add(a
[0], a
[1], a
[2], a
[3])
194 s1
.change_score_method(ord('p'))
195 p
= s1
.call_peaks(cutoff
=0.10, min_length
=10, max_gap
=10)
196 strio
= io
.StringIO()
197 p
.write_to_bed(strio
, trackline
=False)
198 self
.assertEqual(strio
.getvalue(), self
.peak1
)
200 strio
= io
.StringIO()
201 p
.write_to_summit_bed(strio
, trackline
=False)
202 self
.assertEqual(strio
.getvalue(), self
.summit1
)
204 strio
= io
.StringIO()
205 p
.write_to_xls(strio
)
206 self
.assertEqual(strio
.getvalue(), self
.xls1
)