Add : service without host will be just droped, like Nagios.
[shinken.git] / test / test_escalations.py
blobd5a73c44de52c9046b05f4c7c782ad15df47e36d
1 #!/usr/bin/env python2.6
2 #Copyright (C) 2009-2010 :
3 # Gabes Jean, naparuba@gmail.com
4 # Gerhard Lausser, Gerhard.Lausser@consol.de
6 #This file is part of Shinken.
8 #Shinken is free software: you can redistribute it and/or modify
9 #it under the terms of the GNU Affero General Public License as published by
10 #the Free Software Foundation, either version 3 of the License, or
11 #(at your option) any later version.
13 #Shinken is distributed in the hope that it will be useful,
14 #but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 #GNU Affero General Public License for more details.
18 #You should have received a copy of the GNU Affero General Public License
19 #along with Shinken. If not, see <http://www.gnu.org/licenses/>.
23 # This file is used to test host- and service-downtimes.
27 #It's ugly I know....
28 from shinken_test import *
31 class TestEscalations(ShinkenTest):
32 def setUp(self):
33 self.setup_with_file('etc/nagios_escalations.cfg')
36 def test_simple_escalation(self):
37 self.print_header()
38 # retry_interval 2
39 # critical notification
40 # run loop -> another notification
41 now = time.time()
42 host = self.sched.hosts.find_by_name("test_host_0")
43 host.checks_in_progress = []
44 host.act_depend_of = [] # ignore the router
45 svc = self.sched.services.find_srv_by_name_and_hostname("test_host_0", "test_ok_0")
47 #To make tests quicker we make notifications send very quickly
48 svc.notification_interval = 0.001
50 svc.checks_in_progress = []
51 svc.act_depend_of = [] # no hostchecks on critical checkresults
52 #--------------------------------------------------------------
53 # initialize host/service state
54 #--------------------------------------------------------------
55 self.scheduler_loop(1, [[host, 0, 'UP']], do_sleep=True, sleep_time=0.1)
56 print "- 1 x OK -------------------------------------"
57 self.scheduler_loop(1, [[svc, 0, 'OK']], do_sleep=True, sleep_time=0.1)
59 self.assert_(svc.current_notification_number == 0)
61 tolevel2 = self.sched.conf.escalations.find_by_name('ToLevel2')
62 self.assert_(tolevel2 is not None)
63 self.assert_(tolevel2 in svc.escalations)
64 tolevel3 = self.sched.conf.escalations.find_by_name('ToLevel3')
65 self.assert_(tolevel3 is not None)
66 self.assert_(tolevel3 in svc.escalations)
69 for es in svc.escalations:
70 print es.__dict__
72 #--------------------------------------------------------------
73 # service reaches soft;1
74 # there must not be any notification
75 #--------------------------------------------------------------
76 print "- 1 x BAD get soft -------------------------------------"
77 self.scheduler_loop(1, [[svc, 2, 'BAD']], do_sleep=True, sleep_time=0.1)
78 # check_notification: not (soft)
79 print "---current_notification_number", svc.current_notification_number
80 #--------------------------------------------------------------
81 # service reaches hard;2
82 # a notification must have been created
83 # notification number must be 1
84 #--------------------------------------------------------------
85 print "- 1 x BAD get hard -------------------------------------"
86 self.scheduler_loop(1, [[svc, 2, 'BAD']], do_sleep=True, sleep_time=0.1)
88 # We check if we really notify the level1
89 self.assert_(self.any_log_match('SERVICE NOTIFICATION: level1.*;CRITICAL;'))
90 self.show_and_clear_logs()
91 #self.show_and_clear_actions()
92 self.show_actions()
93 print svc.notifications_in_progress
94 for n in svc.notifications_in_progress.values():
95 print n
96 # check_notification: yes (hard)
97 print "---current_notification_number", svc.current_notification_number
98 # notification_number is already sent. the next one has been scheduled
99 # and is waiting for notification_interval to pass. so the current
100 # number is 2
101 self.assert_(svc.current_notification_number == 1)
102 print "OK, level1 is notified, notif nb = 1"
104 print "---------------------------------1st round with a hard"
105 print "find a way to get the number of the last reaction"
106 cnn = svc.current_notification_number
107 print "- 1 x BAD repeat -------------------------------------"
108 self.scheduler_loop(1, [[svc, 2, 'BAD']], do_sleep=True, sleep_time=0.1)
110 # Now we raise the notif number of 2, so we can escalade
111 self.assert_(self.any_log_match('SERVICE NOTIFICATION: level2.*;CRITICAL;'))
112 self.show_and_clear_logs()
113 self.show_actions()
114 print "cnn and cur", cnn, svc.current_notification_number
115 self.assert_(svc.current_notification_number > cnn)
116 cnn = svc.current_notification_number
118 # One more bad, we go 3
119 self.scheduler_loop(1, [[svc, 2, 'BAD']], do_sleep=True, sleep_time=0.1)
120 self.assert_(self.any_log_match('SERVICE NOTIFICATION: level2.*;CRITICAL;'))
121 self.show_and_clear_logs()
123 # We go 4, still level2
124 self.scheduler_loop(1, [[svc, 2, 'BAD']], do_sleep=True, sleep_time=0.1)
125 self.assert_(self.any_log_match('SERVICE NOTIFICATION: level2.*;CRITICAL;'))
126 self.show_and_clear_logs()
128 # We go 5! we escalade to level3
130 self.scheduler_loop(1, [[svc, 2, 'BAD']], do_sleep=True, sleep_time=0.1)
131 self.assert_(self.any_log_match('SERVICE NOTIFICATION: level3.*;CRITICAL;'))
132 self.show_and_clear_logs()
134 # Now we send 10 more notif, we must be still level5
135 for i in range(10):
136 self.scheduler_loop(1, [[svc, 2, 'BAD']], do_sleep=True, sleep_time=0.1)
137 self.assert_(self.any_log_match('SERVICE NOTIFICATION: level3.*;CRITICAL;'))
138 self.show_and_clear_logs()
140 # Now we recover, it will be fun because all of level{1,2,3} must be send a
141 # notif
142 self.scheduler_loop(2, [[svc, 0, 'OK']], do_sleep=True, sleep_time=0.1)
143 self.show_actions()
144 self.assert_(self.any_log_match('SERVICE NOTIFICATION: level1.*;OK;'))
145 self.assert_(self.any_log_match('SERVICE NOTIFICATION: level2.*;OK;'))
146 self.assert_(self.any_log_match('SERVICE NOTIFICATION: level3.*;OK;'))
147 self.show_and_clear_logs()
152 def test_time_based_escalation(self):
153 self.print_header()
154 # retry_interval 2
155 # critical notification
156 # run loop -> another notification
157 now = time.time()
158 host = self.sched.hosts.find_by_name("test_host_0")
159 host.checks_in_progress = []
160 host.act_depend_of = [] # ignore the router
161 svc = self.sched.services.find_srv_by_name_and_hostname("test_host_0", "test_ok_0_time")
163 #To make tests quicker we make notifications send very quickly
164 svc.notification_interval = 0.001
166 svc.checks_in_progress = []
167 svc.act_depend_of = [] # no hostchecks on critical checkresults
168 #--------------------------------------------------------------
169 # initialize host/service state
170 #--------------------------------------------------------------
171 self.scheduler_loop(1, [[host, 0, 'UP']], do_sleep=True, sleep_time=0.1)
172 print "- 1 x OK -------------------------------------"
173 self.scheduler_loop(1, [[svc, 0, 'OK']], do_sleep=True, sleep_time=0.1)
175 self.assert_(svc.current_notification_number == 0)
178 # We check if we correclty linked our escalations
179 tolevel2_time = self.sched.conf.escalations.find_by_name('ToLevel2-time')
180 self.assert_(tolevel2_time is not None)
181 self.assert_(tolevel2_time in svc.escalations)
182 tolevel3_time = self.sched.conf.escalations.find_by_name('ToLevel3-time')
183 self.assert_(tolevel3_time is not None)
184 self.assert_(tolevel3_time in svc.escalations)
186 # Go for the running part!
188 #--------------------------------------------------------------
189 # service reaches soft;1
190 # there must not be any notification
191 #--------------------------------------------------------------
192 print "- 1 x BAD get soft -------------------------------------"
193 self.scheduler_loop(1, [[svc, 2, 'BAD']], do_sleep=True, sleep_time=0.1)
194 # check_notification: not (soft)
195 print "---current_notification_number", svc.current_notification_number
196 #--------------------------------------------------------------
197 # service reaches hard;2
198 # a notification must have been created
199 # notification number must be 1
200 #--------------------------------------------------------------
201 print "- 1 x BAD get hard -------------------------------------"
202 self.scheduler_loop(1, [[svc, 2, 'BAD']], do_sleep=True, sleep_time=0.1)
204 # We check if we really notify the level1
205 self.assert_(self.any_log_match('SERVICE NOTIFICATION: level1.*;CRITICAL;'))
206 self.show_and_clear_logs()
207 self.show_actions()
209 # check_notification: yes (hard)
210 print "---current_notification_number", svc.current_notification_number
211 # notification_number is already sent. the next one has been scheduled
212 # and is waiting for notification_interval to pass. so the current
213 # number is 2
214 self.assert_(svc.current_notification_number == 1)
215 print "OK, level1 is notified, notif nb = 1"
217 print "---------------------------------1st round with a hard"
218 print "find a way to get the number of the last reaction"
219 cnn = svc.current_notification_number
220 print "- 1 x BAD repeat -------------------------------------"
222 # For the test, we hack the notif value because we do not wan to wait 1 hour!
223 for n in svc.notifications_in_progress.values():
224 # HOP, we say : it's already 3600 second since the last notif,
225 svc.notification_interval = 3600
226 # and we say that there is still 1hour since the notification creation
227 # so it will say the notification time is huge, and so it will escalade
228 n.creation_time = n.creation_time - 3600
230 self.scheduler_loop(1, [[svc, 2, 'BAD']], do_sleep=True, sleep_time=0.001)
232 # Now we raise a notification time of 1hour, we escalade to level2
233 self.assert_(self.any_log_match('SERVICE NOTIFICATION: level2.*;CRITICAL;'))
234 self.show_and_clear_logs()
235 self.show_actions()
237 print "cnn and cur", cnn, svc.current_notification_number
238 #We check that we really raise the notif number too
239 self.assert_(svc.current_notification_number > cnn)
240 cnn = svc.current_notification_number
242 for n in svc.notifications_in_progress.values():
243 # HOP, we say : it's already 3600 second since the last notif
244 n.t_to_go = time.time()
246 # One more bad, we say : he, it's still near 1 hour, so still level2
247 self.scheduler_loop(1, [[svc, 2, 'BAD']], do_sleep=True, sleep_time=0.1)
248 self.assert_(self.any_log_match('SERVICE NOTIFICATION: level2.*;CRITICAL;'))
249 self.show_and_clear_logs()
251 # Now we go for level3, so again we say : he, in fact we start one hour earlyer,
252 # so the total notification duration is near 2 hour, so we will raise level3
253 for n in svc.notifications_in_progress.values():
254 # HOP, we say : it's already 3600 second since the last notif,
255 n.t_to_go = time.time()
256 n.creation_time = n.creation_time - 3600
259 # One more, we bypass 7200, so now it's level3
260 self.scheduler_loop(1, [[svc, 2, 'BAD']], do_sleep=True, sleep_time=0.1)
261 self.assert_(self.any_log_match('SERVICE NOTIFICATION: level3.*;CRITICAL;'))
262 self.show_and_clear_logs()
265 # Now we send 10 more notif, we must be still level5
266 for i in range(10):
267 for n in svc.notifications_in_progress.values():
268 # HOP, we say : it's already 3600 second since the last notif,
269 n.t_to_go = time.time()
271 self.scheduler_loop(1, [[svc, 2, 'BAD']], do_sleep=True, sleep_time=0.1)
272 self.assert_(self.any_log_match('SERVICE NOTIFICATION: level3.*;CRITICAL;'))
273 self.show_and_clear_logs()
275 # Now we recover, it will be fun because all of level{1,2,3} must be send a
276 # recovery notif
277 self.scheduler_loop(2, [[svc, 0, 'OK']], do_sleep=True, sleep_time=0.1)
278 self.show_actions()
279 self.assert_(self.any_log_match('SERVICE NOTIFICATION: level1.*;OK;'))
280 self.assert_(self.any_log_match('SERVICE NOTIFICATION: level2.*;OK;'))
281 self.assert_(self.any_log_match('SERVICE NOTIFICATION: level3.*;OK;'))
282 self.show_and_clear_logs()
287 # Here we search to know if a escalation really short the notification
288 # interval if the escalation if BEFORE the next notification. For example
289 # let say we notify one a day, if the escalation if at 4hour, we need
290 # to notify at t=0, and get the next notification at 4h, and not 1day.
291 def test_time_based_escalation_with_shorting_interval(self):
292 self.print_header()
293 # retry_interval 2
294 # critical notification
295 # run loop -> another notification
296 now = time.time()
297 host = self.sched.hosts.find_by_name("test_host_0")
298 host.checks_in_progress = []
299 host.act_depend_of = [] # ignore the router
300 svc = self.sched.services.find_srv_by_name_and_hostname("test_host_0", "test_ok_0_time")
302 # To make tests quicker we make notifications send very quickly
303 # 1 day notification interval
304 svc.notification_interval = 1400
306 svc.checks_in_progress = []
307 svc.act_depend_of = [] # no hostchecks on critical checkresults
308 #--------------------------------------------------------------
309 # initialize host/service state
310 #--------------------------------------------------------------
311 self.scheduler_loop(1, [[host, 0, 'UP']], do_sleep=True, sleep_time=0.1)
312 print "- 1 x OK -------------------------------------"
313 self.scheduler_loop(1, [[svc, 0, 'OK']], do_sleep=True, sleep_time=0.1)
315 self.assert_(svc.current_notification_number == 0)
317 # We check that we really linked our escalations :)
318 tolevel2_time = self.sched.conf.escalations.find_by_name('ToLevel2-time')
319 self.assert_(tolevel2_time is not None)
320 self.assert_(tolevel2_time in svc.escalations)
321 tolevel3_time = self.sched.conf.escalations.find_by_name('ToLevel3-time')
322 self.assert_(tolevel3_time is not None)
323 self.assert_(tolevel3_time in svc.escalations)
325 #--------------------------------------------------------------
326 # service reaches soft;1
327 # there must not be any notification
328 #--------------------------------------------------------------
329 print "- 1 x BAD get soft -------------------------------------"
330 self.scheduler_loop(1, [[svc, 2, 'BAD']], do_sleep=True, sleep_time=0.1)
331 # check_notification: not (soft)
332 print "---current_notification_number", svc.current_notification_number
333 #--------------------------------------------------------------
334 # service reaches hard;2
335 # a notification must have been created
336 # notification number must be 1
337 #--------------------------------------------------------------
338 print "- 1 x BAD get hard -------------------------------------"
339 self.scheduler_loop(1, [[svc, 2, 'BAD']], do_sleep=True, sleep_time=0.1)
341 # We check if we really notify the level1
342 self.assert_(self.any_log_match('SERVICE NOTIFICATION: level1.*;CRITICAL;'))
343 self.show_and_clear_logs()
344 self.show_actions()
346 # check_notification: yes (hard)
347 print "---current_notification_number", svc.current_notification_number
348 # notification_number is already sent. the next one has been scheduled
349 # and is waiting for notification_interval to pass. so the current
350 # number is 2
351 self.assert_(svc.current_notification_number == 1)
352 print "OK, level1 is notified, notif nb = 1"
354 print "---------------------------------1st round with a hard"
355 print "find a way to get the number of the last reaction"
356 cnn = svc.current_notification_number
357 print "- 1 x BAD repeat -------------------------------------"
360 # Now we go for the level2 escalation, so we will need to say : he, it's 1 hour since the begining :p
361 print "*************Next", svc.notification_interval * svc.__class__.interval_length
363 # first, we check if the next notification will really be near 1 hour because the escalation
364 # to level2 is asking for it. If it don't, the standard was 1 day!
365 for n in svc.notifications_in_progress.values():
366 next = svc.get_next_notification_time(n)
367 print next - now
368 # Check if we find the next notification for the next hour,
369 # and not for the next day like we ask before
370 self.assert_(abs(next - now - 3600) < 10)
372 # And we hack the notification so we can raise really the level2 escalation
373 for n in svc.notifications_in_progress.values():
374 n.t_to_go = time.time()
375 n.creation_time -= 3600
377 # We go in trouble too
378 self.scheduler_loop(1, [[svc, 2, 'BAD']], do_sleep=True, sleep_time=0.001)
380 # Now we raise the time since the begining at 1 hour, so we can escalade
381 self.assert_(self.any_log_match('SERVICE NOTIFICATION: level2.*;CRITICAL;'))
382 self.show_and_clear_logs()
383 self.show_actions()
385 print "Level 2 got warn, now we search for level3"
386 print "cnn and cur", cnn, svc.current_notification_number
387 self.assert_(svc.current_notification_number > cnn)
388 cnn = svc.current_notification_number
390 # Now the same thing, but for level3, so one more hour
391 for n in svc.notifications_in_progress.values():
392 # HOP, we say : it's already 3600 second since the last notif,
393 n.t_to_go = time.time()
394 n.creation_time -= 3600
396 # One more bad, we say : he, it's 7200 sc of notif, so must be still level3
397 self.scheduler_loop(1, [[svc, 2, 'BAD']], do_sleep=True, sleep_time=0.1)
398 self.assert_(self.any_log_match('SERVICE NOTIFICATION: level3.*;CRITICAL;'))
399 self.show_and_clear_logs()
401 for n in svc.notifications_in_progress.values():
402 # we say that the next notif will be right now
403 # so we can raise a notif now
404 n.t_to_go = time.time()
406 # One more, we bypass 7200, so now it's still level3
407 self.scheduler_loop(1, [[svc, 2, 'BAD']], do_sleep=True, sleep_time=0.1)
408 self.assert_(self.any_log_match('SERVICE NOTIFICATION: level3.*;CRITICAL;'))
409 self.show_and_clear_logs()
412 # Now we send 10 more notif, we must be still level3
413 for i in range(10):
414 for n in svc.notifications_in_progress.values():
415 # HOP, we say : it's already 3600 second since the last notif,
416 n.t_to_go = time.time()
418 self.scheduler_loop(1, [[svc, 2, 'BAD']], do_sleep=True, sleep_time=0.1)
419 self.assert_(self.any_log_match('SERVICE NOTIFICATION: level3.*;CRITICAL;'))
420 self.show_and_clear_logs()
422 # Now we recover, it will be fun because all of level{1,2,3} must be send a
423 # recovery notif
424 self.scheduler_loop(2, [[svc, 0, 'OK']], do_sleep=True, sleep_time=0.1)
425 self.show_actions()
426 self.assert_(self.any_log_match('SERVICE NOTIFICATION: level1.*;OK;'))
427 self.assert_(self.any_log_match('SERVICE NOTIFICATION: level2.*;OK;'))
428 self.assert_(self.any_log_match('SERVICE NOTIFICATION: level3.*;OK;'))
429 self.show_and_clear_logs()
433 if __name__ == '__main__':
434 unittest.main()