Fix: (Grégory Starck) french name in variables.
[shinken.git] / test / test_downtimes.py
blobd39d9594e5ae1de52237cf8ed581fe15189269b7
1 #!/usr/bin/env python2.6
2 #Copyright (C) 2009-2010 :
3 # Gabes Jean, naparuba@gmail.com
4 # Gerhard Lausser, Gerhard.Lausser@consol.de
6 #This file is part of Shinken.
8 #Shinken is free software: you can redistribute it and/or modify
9 #it under the terms of the GNU Affero General Public License as published by
10 #the Free Software Foundation, either version 3 of the License, or
11 #(at your option) any later version.
13 #Shinken is distributed in the hope that it will be useful,
14 #but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 #GNU Affero General Public License for more details.
18 #You should have received a copy of the GNU Affero General Public License
19 #along with Shinken. If not, see <http://www.gnu.org/licenses/>.
22 # This file is used to test host- and service-downtimes.
25 from shinken_test import *
27 class TestConfig(ShinkenTest):
29 def test_schedule_fixed_svc_downtime(self):
30 self.print_header()
31 # schedule a 2-minute downtime
32 # downtime must be active
33 # consume a good result, sleep for a minute
34 # downtime must be active
35 # consume a bad result
36 # downtime must be active
37 # no notification must be found in broks
38 duration = 600
39 now = time.time()
40 # downtime valid for the next 2 minutes
41 cmd = "[%lu] SCHEDULE_SVC_DOWNTIME;test_host_0;test_ok_0;%d;%d;1;0;%d;lausser;blablub" % (now, now, now + duration, duration)
42 self.sched.run_external_command(cmd)
43 svc = self.sched.services.find_srv_by_name_and_hostname("test_host_0", "test_ok_0")
44 svc.checks_in_progress = []
45 svc.act_depend_of = [] # no hostchecks on critical checkresults
46 time.sleep(20)
47 self.scheduler_loop(1, [[svc, 0, 'OK']])
49 print "downtime was scheduled. check its activity and the comment"
50 self.assert_(len(self.sched.downtimes) == 1)
51 self.assert_(len(svc.downtimes) == 1)
52 self.assert_(svc.downtimes[0] in self.sched.downtimes.values())
53 self.assert_(svc.downtimes[0].fixed)
54 self.assert_(svc.downtimes[0].is_in_effect)
55 self.assert_(not svc.downtimes[0].can_be_deleted)
56 self.assert_(len(self.sched.comments) == 1)
57 self.assert_(len(svc.comments) == 1)
58 self.assert_(svc.comments[0] in self.sched.comments.values())
59 self.assert_(svc.downtimes[0].comment_id == svc.comments[0].id)
61 self.scheduler_loop(1, [[svc, 0, 'OK']])
63 print "good check was launched, downtime must be active"
64 self.assert_(len(self.sched.downtimes) == 1)
65 self.assert_(len(svc.downtimes) == 1)
66 self.assert_(svc.downtimes[0] in self.sched.downtimes.values())
67 self.assert_(svc.in_scheduled_downtime)
68 self.assert_(svc.downtimes[0].fixed)
69 self.assert_(svc.downtimes[0].is_in_effect)
70 self.assert_(not svc.downtimes[0].can_be_deleted)
72 self.scheduler_loop(1, [[svc, 2, 'BAD']])
74 print "bad check was launched (SOFT;1), downtime must be active"
75 self.assert_(len(self.sched.downtimes) == 1)
76 self.assert_(len(svc.downtimes) == 1)
77 self.assert_(svc.downtimes[0] in self.sched.downtimes.values())
78 self.assert_(svc.in_scheduled_downtime)
79 self.assert_(svc.downtimes[0].fixed)
80 self.assert_(svc.downtimes[0].is_in_effect)
81 self.assert_(not svc.downtimes[0].can_be_deleted)
83 # now the state changes to hard
84 self.scheduler_loop(1, [[svc, 2, 'BAD']])
86 print "bad check was launched (HARD;2), downtime must be active"
87 print svc.downtimes[0]
88 self.assert_(len(self.sched.downtimes) == 1)
89 self.assert_(len(svc.downtimes) == 1)
90 self.assert_(svc.downtimes[0] in self.sched.downtimes.values())
91 self.assert_(svc.in_scheduled_downtime)
92 self.assert_(svc.downtimes[0].fixed)
93 self.assert_(svc.downtimes[0].is_in_effect)
94 self.assert_(not svc.downtimes[0].can_be_deleted)
96 scheduled_downtime_depth = svc.scheduled_downtime_depth
97 cmd = "[%lu] DEL_SVC_DOWNTIME;%d" % (now, svc.downtimes[0].id)
98 self.sched.run_external_command(cmd)
99 self.assert_(len(self.sched.downtimes) == 1)
100 self.assert_(len(svc.downtimes) == 1)
101 self.assert_(not svc.in_scheduled_downtime)
102 self.assert_(svc.scheduled_downtime_depth < scheduled_downtime_depth)
103 self.assert_(svc.downtimes[0].fixed)
104 self.assert_(not svc.downtimes[0].is_in_effect)
105 self.assert_(svc.downtimes[0].can_be_deleted)
106 self.assert_(len(self.sched.comments) == 1)
107 self.assert_(len(svc.comments) == 1)
109 # now a notification must be sent
110 self.scheduler_loop(1, [[svc, 2, 'BAD']])
111 # downtimes must have been deleted now
112 self.assert_(len(self.sched.downtimes) == 0)
113 self.assert_(len(svc.downtimes) == 0)
114 self.assert_(len(self.sched.comments) == 0)
115 self.assert_(len(svc.comments) == 0)
117 def test_schedule_flexible_svc_downtime(self):
118 self.print_header()
119 #----------------------------------------------------------------
120 # schedule a flexible downtime of 3 minutes for the host
121 #----------------------------------------------------------------
122 duration = 180
123 now = time.time()
124 cmd = "[%lu] SCHEDULE_SVC_DOWNTIME;test_host_0;test_ok_0;%d;%d;0;0;%d;lausser;blablub" % (now, now, now + duration, duration)
125 self.sched.run_external_command(cmd)
126 svc = self.sched.services.find_srv_by_name_and_hostname("test_host_0", "test_ok_0")
127 svc.checks_in_progress = []
128 svc.act_depend_of = [] # no hostchecks on critical checkresults
129 time.sleep(20)
130 #----------------------------------------------------------------
131 # check if a downtime object exists (scheduler and service)
132 # check if the downtime is still inactive
133 #----------------------------------------------------------------
134 self.assert_(len(self.sched.downtimes) == 1)
135 self.assert_(len(svc.downtimes) == 1)
136 self.assert_(svc.downtimes[0] in self.sched.downtimes.values())
137 self.assert_(not svc.downtimes[0].fixed)
138 self.assert_(not svc.downtimes[0].is_in_effect)
139 self.assert_(not svc.downtimes[0].can_be_deleted)
140 self.assert_(len(self.sched.comments) == 1)
141 self.assert_(len(svc.comments) == 1)
142 self.assert_(svc.comments[0] in self.sched.comments.values())
143 self.assert_(svc.downtimes[0].comment_id == svc.comments[0].id)
144 #----------------------------------------------------------------
145 # run the service and return an OK status
146 # check if the downtime is still inactive
147 #----------------------------------------------------------------
148 self.scheduler_loop(1, [[svc, 0, 'OK']])
149 self.assert_(len(self.sched.downtimes) == 1)
150 self.assert_(len(svc.downtimes) == 1)
151 self.assert_(svc.downtimes[0] in self.sched.downtimes.values())
152 self.assert_(not svc.in_scheduled_downtime)
153 self.assert_(not svc.downtimes[0].fixed)
154 self.assert_(not svc.downtimes[0].is_in_effect)
155 self.assert_(not svc.downtimes[0].can_be_deleted)
156 time.sleep(61)
157 #----------------------------------------------------------------
158 # run the service twice to get a soft critical status
159 # check if the downtime is still inactive
160 #----------------------------------------------------------------
161 self.scheduler_loop(1, [[svc, 2, 'BAD']])
162 self.assert_(len(self.sched.downtimes) == 1)
163 self.assert_(len(svc.downtimes) == 1)
164 self.assert_(svc.downtimes[0] in self.sched.downtimes.values())
165 self.assert_(not svc.in_scheduled_downtime)
166 self.assert_(not svc.downtimes[0].fixed)
167 self.assert_(not svc.downtimes[0].is_in_effect)
168 self.assert_(not svc.downtimes[0].can_be_deleted)
169 time.sleep(61)
170 #----------------------------------------------------------------
171 # run the service again to get a hard critical status
172 # check if the downtime is active now
173 #----------------------------------------------------------------
174 self.scheduler_loop(1, [[svc, 2, 'BAD']])
175 self.assert_(len(self.sched.downtimes) == 1)
176 self.assert_(len(svc.downtimes) == 1)
177 self.assert_(svc.downtimes[0] in self.sched.downtimes.values())
178 self.assert_(svc.in_scheduled_downtime)
179 self.assert_(not svc.downtimes[0].fixed)
180 self.assert_(svc.downtimes[0].is_in_effect)
181 self.assert_(not svc.downtimes[0].can_be_deleted)
182 #----------------------------------------------------------------
183 # cancel the downtime
184 # check if the downtime is inactive now and can be deleted
185 #----------------------------------------------------------------
186 scheduled_downtime_depth = svc.scheduled_downtime_depth
187 cmd = "[%lu] DEL_SVC_DOWNTIME;%d" % (now, svc.downtimes[0].id)
188 self.sched.run_external_command(cmd)
189 self.assert_(len(self.sched.downtimes) == 1)
190 self.assert_(len(svc.downtimes) == 1)
191 self.assert_(not svc.in_scheduled_downtime)
192 self.assert_(svc.scheduled_downtime_depth < scheduled_downtime_depth)
193 self.assert_(not svc.downtimes[0].fixed)
194 self.assert_(not svc.downtimes[0].is_in_effect)
195 self.assert_(svc.downtimes[0].can_be_deleted)
196 self.assert_(len(self.sched.comments) == 1)
197 self.assert_(len(svc.comments) == 1)
198 time.sleep(61)
199 #----------------------------------------------------------------
200 # run the service again with a critical status
201 # the downtime must have disappeared
202 # a notification must be sent
203 #----------------------------------------------------------------
204 self.scheduler_loop(1, [[svc, 2, 'BAD']])
205 self.assert_(len(self.sched.downtimes) == 0)
206 self.assert_(len(svc.downtimes) == 0)
207 self.assert_(len(self.sched.comments) == 0)
208 self.assert_(len(svc.comments) == 0)
209 self.show_logs()
210 self.show_actions()
213 def test_schedule_fixed_host_downtime(self):
214 self.print_header()
215 # schedule a 2-minute downtime
216 # downtime must be active
217 # consume a good result, sleep for a minute
218 # downtime must be active
219 # consume a bad result
220 # downtime must be active
221 # no notification must be found in broks
222 host = self.sched.hosts.find_by_name("test_host_0")
223 host.checks_in_progress = []
224 host.act_depend_of = [] # ignore the router
225 svc = self.sched.services.find_srv_by_name_and_hostname("test_host_0", "test_ok_0")
226 svc.checks_in_progress = []
227 svc.act_depend_of = [] # no hostchecks on critical checkresults
228 print "test_schedule_fixed_host_downtime initialized"
229 self.show_logs()
230 self.show_actions()
231 self.assert_(self.count_logs() == 0)
232 self.assert_(self.count_actions() == 0)
233 #----------------------------------------------------------------
234 # schedule a downtime of 10 minutes for the host
235 #----------------------------------------------------------------
236 duration = 600
237 now = time.time()
238 # fixed downtime valid for the next 10 minutes
239 cmd = "[%lu] SCHEDULE_HOST_DOWNTIME;test_host_0;%d;%d;1;0;%d;lausser;blablub" % (now, now, now + duration, duration)
241 self.sched.run_external_command(cmd)
242 self.sched.update_downtimes_and_comments()
243 print "Launch scheduler loop"
244 self.scheduler_loop(1,[], do_sleep=False) # push the downtime notification
245 self.show_actions()
246 print "Launch worker loop"
247 #self.worker_loop()
248 self.show_actions()
249 print "After both launchs"
250 time.sleep(20)
251 #----------------------------------------------------------------
252 # check if a downtime object exists (scheduler and host)
253 #----------------------------------------------------------------
254 self.assert_(len(self.sched.downtimes) == 1)
255 self.assert_(len(host.downtimes) == 1)
256 self.assert_(host.downtimes[0] in self.sched.downtimes.values())
257 self.assert_(host.downtimes[0].fixed)
258 self.assert_(host.downtimes[0].is_in_effect)
259 self.assert_(not host.downtimes[0].can_be_deleted)
260 self.assert_(len(self.sched.comments) == 1)
261 self.assert_(len(host.comments) == 1)
262 self.assert_(host.comments[0] in self.sched.comments.values())
263 self.assert_(host.downtimes[0].comment_id == host.comments[0].id)
264 self.show_logs()
265 self.show_actions()
266 print "*****************************************************************************************************************************************************************Log matching:", self.get_log_match("STARTED*")
267 self.assert_(self.count_logs() == 2) # start downt, notif downt
268 #sys.exit(1)
269 self.assert_(self.count_actions() == 2) # notif" down
270 self.clear_logs()
271 self.clear_actions()
272 #----------------------------------------------------------------
273 # send the host to a hard DOWN state
274 # check log messages, (no) notifications and eventhandlers
275 #----------------------------------------------------------------
276 self.scheduler_loop(1, [[host, 2, 'DOWN']])
277 self.show_logs()
278 self.show_actions()
279 self.assert_(self.count_logs() == 2) # soft1, evt1
280 self.assert_(self.count_actions() == 1) # evt1
281 self.clear_logs()
283 self.scheduler_loop(1, [[host, 2, 'DOWN']])
284 self.show_logs()
285 self.show_actions()
286 self.assert_(self.count_logs() == 2) # soft2, evt2
287 self.assert_(self.count_actions() == 1) # evt2
288 self.clear_logs()
290 self.scheduler_loop(1, [[host, 2, 'DOWN']])
291 self.show_logs()
292 self.show_actions()
293 self.assert_(self.count_logs() == 2) # hard3, evt3
294 self.assert_(self.count_actions() == 2) # evt3, notif"
295 self.clear_logs()
297 # we have a notification, but this is blocked. it will stay in
298 # the actions queue because we have a notification_interval.
299 # it's called notif" because it is a master notification
300 print "DBG: host", host.state, host.state_type
301 self.scheduler_loop(1, [[host, 2, 'DOWN']], do_sleep=True)
302 print "DBG2: host", host.state, host.state_type
303 self.show_logs()
304 self.show_actions()
305 self.assert_(self.count_logs() == 0) #
306 self.assert_(self.count_actions() == 1) # notif"
307 self.clear_logs()
308 #----------------------------------------------------------------
309 # the host comes UP again
310 # check log messages, (no) notifications and eventhandlers
311 # a (recovery) notification was created, but has been blocked.
312 # we see it in the actions as a zombie
313 #----------------------------------------------------------------
314 self.scheduler_loop(1, [[host, 0, 'UP']], do_sleep=True)
315 self.show_logs()
316 self.show_actions()
317 self.assert_(self.count_logs() == 2) # hard3ok, evtok
318 self.assert_(self.count_actions() == 2) # evtok, notif"
319 self.clear_logs()
320 self.clear_actions()
323 def test_schedule_fixed_host_downtime_with_service(self):
324 self.print_header()
325 host = self.sched.hosts.find_by_name("test_host_0")
326 host.checks_in_progress = []
327 host.act_depend_of = [] # ignore the router
328 svc = self.sched.services.find_srv_by_name_and_hostname("test_host_0", "test_ok_0")
329 svc.checks_in_progress = []
330 svc.act_depend_of = [] # no hostchecks on critical checkresults
331 host.notification_interval = 0
332 svc.notification_interval = 0
333 self.show_logs()
334 self.show_actions()
335 self.assert_(self.count_logs() == 0)
336 self.assert_(self.count_actions() == 0)
337 #----------------------------------------------------------------
338 # schedule a downtime of 10 minutes for the host
339 #----------------------------------------------------------------
340 duration = 600
341 now = time.time()
342 cmd = "[%lu] SCHEDULE_HOST_DOWNTIME;test_host_0;%d;%d;1;0;%d;lausser;blablub" % (now, now, now + duration, duration)
343 self.sched.run_external_command(cmd)
344 self.sched.update_downtimes_and_comments()
345 self.scheduler_loop(1,[], do_sleep=False) #push the downtime notification
346 #self.worker_loop() # push the downtime notification
347 time.sleep(10)
348 #----------------------------------------------------------------
349 # check if a downtime object exists (scheduler and host)
350 # check the start downtime notification
351 #----------------------------------------------------------------
352 self.assert_(len(self.sched.downtimes) == 1)
353 self.assert_(len(host.downtimes) == 1)
354 self.assert_(host.in_scheduled_downtime)
355 self.assert_(host.downtimes[0] in self.sched.downtimes.values())
356 self.assert_(host.downtimes[0].fixed)
357 self.assert_(host.downtimes[0].is_in_effect)
358 self.assert_(not host.downtimes[0].can_be_deleted)
359 self.assert_(len(self.sched.comments) == 1)
360 self.assert_(len(host.comments) == 1)
361 self.assert_(host.comments[0] in self.sched.comments.values())
362 self.assert_(host.downtimes[0].comment_id == host.comments[0].id)
363 self.scheduler_loop(4, [[host, 2, 'DOWN']], do_sleep=True)
364 self.show_logs()
365 self.show_actions()
366 self.assert_(self.count_logs() == 8) # start downt, notif downt, soft1, evt1, soft 2, evt2, hard 3, evt3
367 self.clear_logs()
368 self.clear_actions()
369 #----------------------------------------------------------------
370 # now the service becomes critical
371 # check that the host has a downtime, _not_ the service
372 # check logs, (no) notifications and eventhandlers
373 #----------------------------------------------------------------
374 print "now the service goes critical"
375 self.scheduler_loop(4, [[svc, 2, 'CRITICAL']], do_sleep=True)
376 self.assert_(len(self.sched.downtimes) == 1)
377 self.assert_(len(svc.downtimes) == 0)
378 self.assert_(not svc.in_scheduled_downtime)
379 self.assert_(svc.host.in_scheduled_downtime)
380 self.show_logs()
381 self.show_actions()
382 # soft 1, evt1, hard 2, evt2
383 self.assert_(self.count_logs() == 4)
384 self.clear_logs()
385 self.clear_actions()
386 #----------------------------------------------------------------
387 # the host comes UP again
388 # check log messages, (no) notifications and eventhandlers
389 #----------------------------------------------------------------
390 print "now the host comes up"
391 self.scheduler_loop(2, [[host, 0, 'UP']], do_sleep=True)
392 self.show_logs()
393 self.show_actions()
394 # hard 3, eventhandler
395 self.assert_(self.count_logs() == 2) # up, evt
396 self.clear_logs()
397 self.clear_actions()
398 #----------------------------------------------------------------
399 # the service becomes OK again
400 # check log messages, (no) notifications and eventhandlers
401 # check if the stop downtime notification is the only one
402 #----------------------------------------------------------------
403 self.scheduler_loop(2, [[host, 0, 'UP']], do_sleep=True)
404 self.assert_(len(self.sched.downtimes) == 0)
405 self.assert_(len(host.downtimes) == 0)
406 self.assert_(not host.in_scheduled_downtime)
407 self.show_logs()
408 self.show_actions()
409 self.assert_(self.log_match(1, 'HOST DOWNTIME ALERT.*STOPPED'))
410 self.clear_logs()
411 self.clear_actions()
415 # todo
416 # checks return 1=warn. this means normally up
417 # set use_aggressive_host_checking which treats warn as down
419 # send host into downtime
420 # run service checks with result critical
421 # host exits downtime
422 # does the service send a notification like when it exts a svc dt?
423 # check for notifications
425 # host is down and in downtime. what about service eventhandlers?
428 def test_notification_after_cancel_flexible_svc_downtime(self):
429 # schedule flexible downtime
430 # good check
431 # bad check -> SOFT;1
432 # eventhandler SOFT;1
433 # bad check -> HARD;2
434 # downtime alert
435 # eventhandler HARD;2
436 # cancel downtime
437 # bad check -> HARD;2
438 # notification critical
440 pass
442 if __name__ == '__main__':
443 unittest.main()