1 #!/usr/bin/env python2.6
2 #Copyright (C) 2009-2010 :
3 # Gabes Jean, naparuba@gmail.com
4 # Gerhard Lausser, Gerhard.Lausser@consol.de
6 #This file is part of Shinken.
8 #Shinken is free software: you can redistribute it and/or modify
9 #it under the terms of the GNU Affero General Public License as published by
10 #the Free Software Foundation, either version 3 of the License, or
11 #(at your option) any later version.
13 #Shinken is distributed in the hope that it will be useful,
14 #but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 #GNU Affero General Public License for more details.
18 #You should have received a copy of the GNU Affero General Public License
19 #along with Shinken. If not, see <http://www.gnu.org/licenses/>.
22 # This file is used to test reading and processing of config files
27 from shinken_test
import *
30 class TestConfig(ShinkenTest
):
31 #setUp is in shinken_test
34 return self
.sched
.services
.find_srv_by_name_and_hostname("test_host_0", "test_ok_0")
37 #Look if get_*_name return the good result
38 def test_get_name(self
):
40 print svc
.get_dbg_name()
41 self
.assert_(svc
.get_name() == 'test_ok_0')
42 self
.assert_(svc
.get_dbg_name() == 'test_host_0/test_ok_0')
44 #getstate should be with all properties in dict class + id
45 #check also the setstate
46 def test___getstate__(self
):
50 state
= svc
.__getstate
__()
51 #Check it's the good lenght
52 self
.assert_(len(state
) == len(cls
.properties
) + len(cls
.running_properties
) + 1)
54 svc_copy
= copy
.copy(svc
)
55 #reset the state in the original service
56 svc
.__setstate
__(state
)
57 #And it should be the same :then before :)
58 for p
in cls
.properties
:
59 # print getattr(svc_copy, p)
60 # print getattr(svc, p)
61 self
.assert_(getattr(svc_copy
, p
) == getattr(svc
, p
))
64 #Look if it can detect all incorrect cases
65 def test_is_correct(self
):
69 self
.assert_(svc
.is_correct() == True)
71 #Now try to delete a required property
72 max_check_attempts
= svc
.max_check_attempts
73 del svc
.max_check_attempts
74 self
.assert_(svc
.is_correct() == False)
75 svc
.max_check_attempts
= max_check_attempts
81 #no contacts with notification enabled is a problem
82 svc
.notifications_enabled
= True
83 contacts
= svc
.contacts
84 contact_groups
= svc
.contact_groups
86 del svc
.contact_groups
87 self
.assert_(svc
.is_correct() == False)
88 #and with disabled it's ok
89 svc
.notifications_enabled
= False
90 self
.assert_(svc
.is_correct() == True)
91 svc
.contacts
= contacts
92 svc
.contact_groups
= contact_groups
94 svc
.notifications_enabled
= True
95 self
.assert_(svc
.is_correct() == True)
98 check_command
= svc
.check_command
100 self
.assert_(svc
.is_correct() == False)
101 svc
.check_command
= check_command
102 self
.assert_(svc
.is_correct() == True)
104 #no notification_interval
105 notification_interval
= svc
.notification_interval
106 del svc
.notification_interval
107 self
.assert_(svc
.is_correct() == False)
108 svc
.notification_interval
= notification_interval
109 self
.assert_(svc
.is_correct() == True)
114 self
.assert_(svc
.is_correct() == False)
116 self
.assert_(svc
.is_correct() == True)
119 #Look for set/unset impacted states (unknown)
120 def test_impact_state(self
):
122 ori_state
= svc
.state
123 ori_state_id
= svc
.state_id
124 svc
.set_impact_state()
125 self
.assert_(svc
.state
== 'UNKNOWN')
126 self
.assert_(svc
.state_id
== 3)
127 svc
.unset_impact_state()
128 self
.assert_(svc
.state
== ori_state
)
129 self
.assert_(svc
.state_id
== ori_state_id
)
131 def test_set_state_from_exit_status(self
):
134 svc
.set_state_from_exit_status(0)
135 self
.assert_(svc
.state
== 'OK')
136 self
.assert_(svc
.state_id
== 0)
137 self
.assert_(svc
.is_state('OK') == True)
138 self
.assert_(svc
.is_state('o') == True)
140 svc
.set_state_from_exit_status(1)
141 self
.assert_(svc
.state
== 'WARNING')
142 self
.assert_(svc
.state_id
== 1)
143 self
.assert_(svc
.is_state('WARNING') == True)
144 self
.assert_(svc
.is_state('w') == True)
146 svc
.set_state_from_exit_status(2)
147 self
.assert_(svc
.state
== 'CRITICAL')
148 self
.assert_(svc
.state_id
== 2)
149 self
.assert_(svc
.is_state('CRITICAL') == True)
150 self
.assert_(svc
.is_state('c') == True)
152 svc
.set_state_from_exit_status(3)
153 self
.assert_(svc
.state
== 'UNKNOWN')
154 self
.assert_(svc
.state_id
== 3)
155 self
.assert_(svc
.is_state('UNKNOWN') == True)
156 self
.assert_(svc
.is_state('u') == True)
158 #And something else :)
159 svc
.set_state_from_exit_status(99)
160 self
.assert_(svc
.state
== 'CRITICAL')
161 self
.assert_(svc
.state_id
== 2)
162 self
.assert_(svc
.is_state('CRITICAL') == True)
163 self
.assert_(svc
.is_state('c') == True)
165 #Check if the check_freshnes is doing it's job
166 def test_check_freshness(self
):
168 #We want an eventhandelr (the perfdata command) to be put in the actions dict
169 #after we got a service check
171 svc
= self
.sched
.services
.find_srv_by_name_and_hostname("test_host_0", "test_ok_0")
172 svc
.checks_in_progress
= []
173 svc
.act_depend_of
= [] # no hostchecks on critical checkresults
174 #--------------------------------------------------------------
175 # initialize host/service state
176 #--------------------------------------------------------------
177 #We do not want to be just a string but a real command
178 print "Additonal freshness latency", svc
.__class
__.additional_freshness_latency
179 self
.scheduler_loop(1, [[svc
, 0, 'OK | bibi=99%']])
180 print "Addi :", svc
.last_state_update
, svc
.freshness_threshold
, svc
.check_freshness
181 #By default check fresh ness is set at false, so no new checks
182 self
.assert_(len(svc
.actions
) == 0)
183 svc
.do_check_freshness()
184 self
.assert_(len(svc
.actions
) == 0)
186 #We make it 10s less than it was
187 svc
.last_state_update
= svc
.last_state_update
- 10
190 #Now we active it, with a too small value (now - 10s is still higer than now - (1 - 15, the addition time)
192 svc
.check_freshness
= True
193 svc
.freshness_threshold
= 1
194 print "Addi:", svc
.last_state_update
, svc
.freshness_threshold
, svc
.check_freshness
195 svc
.do_check_freshness()
196 self
.assert_(len(svc
.actions
) == 0)
198 #Ok, now, we remove again 10s. Here we will saw the new entry
199 svc
.last_state_update
= svc
.last_state_update
- 10
200 svc
.do_check_freshness()
201 self
.assert_(len(svc
.actions
) == 1)
202 #And we check for the message in the log too
203 self
.assert_(self
.log_match(1, 'Warning: The results of service'))
206 def test_criticity_value(self
):
207 svc
= self
.sched
.services
.find_srv_by_name_and_hostname("test_host_0", "test_ok_0")
208 #This service inherit the improtance value from his father, 5
209 self
.assert_(svc
.criticity
== 5)
212 #Look if the service is in the servicegroup
213 def test_servicegroup(self
):
214 sg
= self
.sched
.servicegroups
.find_by_name("servicegroup_01")
215 self
.assert_(sg
!= None)
216 svc
= self
.sched
.services
.find_srv_by_name_and_hostname("test_host_0", "test_ok_0")
217 self
.assert_(svc
in sg
.members
)
218 self
.assert_(sg
in svc
.servicegroups
)
220 #Look at the good of the last_hard_state_change
221 def test_service_last_hard_state(self
):
223 #We want an eventhandelr (the perfdata command) to be put in the actions dict
224 #after we got a service check
226 svc
= self
.sched
.services
.find_srv_by_name_and_hostname("test_host_0", "test_ok_0")
227 svc
.checks_in_progress
= []
228 svc
.act_depend_of
= [] # no hostchecks on critical checkresults
229 #--------------------------------------------------------------
230 # initialize host/service state
231 #--------------------------------------------------------------
232 #We do not want to be just a string but a real command
233 self
.scheduler_loop(1, [[svc
, 0, 'OK | bibi=99%']])
234 print "FUCK", svc
.last_hard_state_change
235 orig
= svc
.last_hard_state_change
236 self
.assert_(svc
.last_hard_state
== 'OK')
239 self
.scheduler_loop(1, [[svc
, 0, 'OK | bibi=99%']])
240 self
.assert_(svc
.last_hard_state_change
== orig
)
241 self
.assert_(svc
.last_hard_state
== 'OK')
243 #now error but still SOFT
244 self
.scheduler_loop(1, [[svc
, 2, 'CRITICAL | bibi=99%']])
245 print "FUCK", svc
.state_type
246 self
.assert_(svc
.last_hard_state_change
== orig
)
247 self
.assert_(svc
.last_hard_state
== 'OK')
250 now
= int(time
.time())
251 self
.scheduler_loop(1, [[svc
, 2, 'CRITICAL | bibi=99%']])
252 print "FUCK", svc
.state_type
253 self
.assert_(svc
.last_hard_state_change
== now
)
254 self
.assert_(svc
.last_hard_state
== 'CRITICAL')
255 print "Last hard state id", svc
.last_hard_state_id
256 self
.assert_(svc
.last_hard_state_id
== 2)
259 # Check if the autoslots are fill like it should
260 def test_autoslots(self
):
262 self
.assert_("check_period" not in svc
.__dict
__)
265 # Check if the parent/childs dependencies are fill like it should
266 def test_parent_child_dep_list(self
):
268 # Look if our host is a parent
269 self
.assert_(svc
.host
in svc
.parent_dependencies
)
270 # and if we are a child of it
271 self
.assert_(svc
in svc
.host
.child_dependencies
)
274 if __name__
== '__main__':