1 #!/usr/bin/env python2.6
2 #Copyright (C) 2009-2010 :
3 # Gabes Jean, naparuba@gmail.com
4 # Gerhard Lausser, Gerhard.Lausser@consol.de
6 #This file is part of Shinken.
8 #Shinken is free software: you can redistribute it and/or modify
9 #it under the terms of the GNU Affero General Public License as published by
10 #the Free Software Foundation, either version 3 of the License, or
11 #(at your option) any later version.
13 #Shinken is distributed in the hope that it will be useful,
14 #but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 #GNU Affero General Public License for more details.
18 #You should have received a copy of the GNU Affero General Public License
19 #along with Shinken. If not, see <http://www.gnu.org/licenses/>.
23 # This file is used to test host- and service-downtimes.
28 from shinken_test
import *
31 class TestEscalations(ShinkenTest
):
33 self
.setup_with_file('etc/nagios_escalations.cfg')
36 def test_simple_escalation(self
):
39 # critical notification
40 # run loop -> another notification
42 host
= self
.sched
.hosts
.find_by_name("test_host_0")
43 host
.checks_in_progress
= []
44 host
.act_depend_of
= [] # ignore the router
45 svc
= self
.sched
.services
.find_srv_by_name_and_hostname("test_host_0", "test_ok_0")
47 #To make tests quicker we make notifications send very quickly
48 svc
.notification_interval
= 0.001
50 svc
.checks_in_progress
= []
51 svc
.act_depend_of
= [] # no hostchecks on critical checkresults
52 #--------------------------------------------------------------
53 # initialize host/service state
54 #--------------------------------------------------------------
55 self
.scheduler_loop(1, [[host
, 0, 'UP']], do_sleep
=True, sleep_time
=0.1)
56 print "- 1 x OK -------------------------------------"
57 self
.scheduler_loop(1, [[svc
, 0, 'OK']], do_sleep
=True, sleep_time
=0.1)
59 self
.assert_(svc
.current_notification_number
== 0)
61 tolevel2
= self
.sched
.conf
.escalations
.find_by_name('ToLevel2')
62 self
.assert_(tolevel2
!= None)
63 self
.assert_(tolevel2
in svc
.escalations
)
64 tolevel3
= self
.sched
.conf
.escalations
.find_by_name('ToLevel3')
65 self
.assert_(tolevel3
!= None)
66 self
.assert_(tolevel3
in svc
.escalations
)
69 for es
in svc
.escalations
:
72 #--------------------------------------------------------------
73 # service reaches soft;1
74 # there must not be any notification
75 #--------------------------------------------------------------
76 print "- 1 x BAD get soft -------------------------------------"
77 self
.scheduler_loop(1, [[svc
, 2, 'BAD']], do_sleep
=True, sleep_time
=0.1)
78 # check_notification: not (soft)
79 print "---current_notification_number", svc
.current_notification_number
80 #--------------------------------------------------------------
81 # service reaches hard;2
82 # a notification must have been created
83 # notification number must be 1
84 #--------------------------------------------------------------
85 print "- 1 x BAD get hard -------------------------------------"
86 self
.scheduler_loop(1, [[svc
, 2, 'BAD']], do_sleep
=True, sleep_time
=0.1)
88 # We check if we really notify the level1
89 self
.assert_(self
.any_log_match('SERVICE NOTIFICATION: level1.*;CRITICAL;'))
90 self
.show_and_clear_logs()
91 #self.show_and_clear_actions()
93 print svc
.notifications_in_progress
94 for n
in svc
.notifications_in_progress
.values():
96 # check_notification: yes (hard)
97 print "---current_notification_number", svc
.current_notification_number
98 # notification_number is already sent. the next one has been scheduled
99 # and is waiting for notification_interval to pass. so the current
101 self
.assert_(svc
.current_notification_number
== 1)
102 print "OK, level1 is notified, notif nb = 1"
104 print "---------------------------------1st round with a hard"
105 print "find a way to get the number of the last reaction"
106 cnn
= svc
.current_notification_number
107 print "- 1 x BAD repeat -------------------------------------"
108 self
.scheduler_loop(1, [[svc
, 2, 'BAD']], do_sleep
=True, sleep_time
=0.1)
110 # Now we raise the notif number of 2, so we can escalade
111 self
.assert_(self
.any_log_match('SERVICE NOTIFICATION: level2.*;CRITICAL;'))
112 self
.show_and_clear_logs()
114 print "cnn and cur", cnn
, svc
.current_notification_number
115 self
.assert_(svc
.current_notification_number
> cnn
)
116 cnn
= svc
.current_notification_number
118 # One more bad, we go 3
119 self
.scheduler_loop(1, [[svc
, 2, 'BAD']], do_sleep
=True, sleep_time
=0.1)
120 self
.assert_(self
.any_log_match('SERVICE NOTIFICATION: level2.*;CRITICAL;'))
121 self
.show_and_clear_logs()
123 # We go 4, still level2
124 self
.scheduler_loop(1, [[svc
, 2, 'BAD']], do_sleep
=True, sleep_time
=0.1)
125 self
.assert_(self
.any_log_match('SERVICE NOTIFICATION: level2.*;CRITICAL;'))
126 self
.show_and_clear_logs()
128 # We go 5! we escalade to level3
130 self
.scheduler_loop(1, [[svc
, 2, 'BAD']], do_sleep
=True, sleep_time
=0.1)
131 self
.assert_(self
.any_log_match('SERVICE NOTIFICATION: level3.*;CRITICAL;'))
132 self
.show_and_clear_logs()
134 # Now we send 10 more notif, we must be still level5
136 self
.scheduler_loop(1, [[svc
, 2, 'BAD']], do_sleep
=True, sleep_time
=0.1)
137 self
.assert_(self
.any_log_match('SERVICE NOTIFICATION: level3.*;CRITICAL;'))
138 self
.show_and_clear_logs()
140 # Now we recover, it will be fun because all of level{1,2,3} must be send a
142 self
.scheduler_loop(2, [[svc
, 0, 'OK']], do_sleep
=True, sleep_time
=0.1)
144 self
.assert_(self
.any_log_match('SERVICE NOTIFICATION: level1.*;OK;'))
145 self
.assert_(self
.any_log_match('SERVICE NOTIFICATION: level2.*;OK;'))
146 self
.assert_(self
.any_log_match('SERVICE NOTIFICATION: level3.*;OK;'))
147 self
.show_and_clear_logs()
152 def test_time_based_escalation(self
):
155 # critical notification
156 # run loop -> another notification
158 host
= self
.sched
.hosts
.find_by_name("test_host_0")
159 host
.checks_in_progress
= []
160 host
.act_depend_of
= [] # ignore the router
161 svc
= self
.sched
.services
.find_srv_by_name_and_hostname("test_host_0", "test_ok_0_time")
163 #To make tests quicker we make notifications send very quickly
164 svc
.notification_interval
= 0.001
166 svc
.checks_in_progress
= []
167 svc
.act_depend_of
= [] # no hostchecks on critical checkresults
168 #--------------------------------------------------------------
169 # initialize host/service state
170 #--------------------------------------------------------------
171 self
.scheduler_loop(1, [[host
, 0, 'UP']], do_sleep
=True, sleep_time
=0.1)
172 print "- 1 x OK -------------------------------------"
173 self
.scheduler_loop(1, [[svc
, 0, 'OK']], do_sleep
=True, sleep_time
=0.1)
175 self
.assert_(svc
.current_notification_number
== 0)
178 # We check if we correclty linked our escalations
179 tolevel2_time
= self
.sched
.conf
.escalations
.find_by_name('ToLevel2-time')
180 self
.assert_(tolevel2_time
!= None)
181 self
.assert_(tolevel2_time
in svc
.escalations
)
182 tolevel3_time
= self
.sched
.conf
.escalations
.find_by_name('ToLevel3-time')
183 self
.assert_(tolevel3_time
!= None)
184 self
.assert_(tolevel3_time
in svc
.escalations
)
186 # Go for the running part!
188 #--------------------------------------------------------------
189 # service reaches soft;1
190 # there must not be any notification
191 #--------------------------------------------------------------
192 print "- 1 x BAD get soft -------------------------------------"
193 self
.scheduler_loop(1, [[svc
, 2, 'BAD']], do_sleep
=True, sleep_time
=0.1)
194 # check_notification: not (soft)
195 print "---current_notification_number", svc
.current_notification_number
196 #--------------------------------------------------------------
197 # service reaches hard;2
198 # a notification must have been created
199 # notification number must be 1
200 #--------------------------------------------------------------
201 print "- 1 x BAD get hard -------------------------------------"
202 self
.scheduler_loop(1, [[svc
, 2, 'BAD']], do_sleep
=True, sleep_time
=0.1)
204 # We check if we really notify the level1
205 self
.assert_(self
.any_log_match('SERVICE NOTIFICATION: level1.*;CRITICAL;'))
206 self
.show_and_clear_logs()
209 # check_notification: yes (hard)
210 print "---current_notification_number", svc
.current_notification_number
211 # notification_number is already sent. the next one has been scheduled
212 # and is waiting for notification_interval to pass. so the current
214 self
.assert_(svc
.current_notification_number
== 1)
215 print "OK, level1 is notified, notif nb = 1"
217 print "---------------------------------1st round with a hard"
218 print "find a way to get the number of the last reaction"
219 cnn
= svc
.current_notification_number
220 print "- 1 x BAD repeat -------------------------------------"
222 # For the test, we hack the notif value because we do not wan to wait 1 hour!
223 for n
in svc
.notifications_in_progress
.values():
224 # HOP, we say : it's already 3600 second since the last notif,
225 svc
.notification_interval
= 3600
226 # and we say that there is still 1hour since the notification creation
227 # so it will say the notification time is huge, and so it will escalade
228 n
.creation_time
= n
.creation_time
- 3600
230 self
.scheduler_loop(1, [[svc
, 2, 'BAD']], do_sleep
=True, sleep_time
=0.001)
232 # Now we raise a notification time of 1hour, we escalade to level2
233 self
.assert_(self
.any_log_match('SERVICE NOTIFICATION: level2.*;CRITICAL;'))
234 self
.show_and_clear_logs()
237 print "cnn and cur", cnn
, svc
.current_notification_number
238 #We check that we really raise the notif number too
239 self
.assert_(svc
.current_notification_number
> cnn
)
240 cnn
= svc
.current_notification_number
242 for n
in svc
.notifications_in_progress
.values():
243 # HOP, we say : it's already 3600 second since the last notif
244 n
.t_to_go
= time
.time()
246 # One more bad, we say : he, it's still near 1 hour, so still level2
247 self
.scheduler_loop(1, [[svc
, 2, 'BAD']], do_sleep
=True, sleep_time
=0.1)
248 self
.assert_(self
.any_log_match('SERVICE NOTIFICATION: level2.*;CRITICAL;'))
249 self
.show_and_clear_logs()
251 # Now we go for level3, so again we say : he, in fact we start one hour earlyer,
252 # so the total notification duration is near 2 hour, so we will raise level3
253 for n
in svc
.notifications_in_progress
.values():
254 # HOP, we say : it's already 3600 second since the last notif,
255 n
.t_to_go
= time
.time()
256 n
.creation_time
= n
.creation_time
- 3600
259 # One more, we bypass 7200, so now it's level3
260 self
.scheduler_loop(1, [[svc
, 2, 'BAD']], do_sleep
=True, sleep_time
=0.1)
261 self
.assert_(self
.any_log_match('SERVICE NOTIFICATION: level3.*;CRITICAL;'))
262 self
.show_and_clear_logs()
265 # Now we send 10 more notif, we must be still level5
267 for n
in svc
.notifications_in_progress
.values():
268 # HOP, we say : it's already 3600 second since the last notif,
269 n
.t_to_go
= time
.time()
271 self
.scheduler_loop(1, [[svc
, 2, 'BAD']], do_sleep
=True, sleep_time
=0.1)
272 self
.assert_(self
.any_log_match('SERVICE NOTIFICATION: level3.*;CRITICAL;'))
273 self
.show_and_clear_logs()
275 # Now we recover, it will be fun because all of level{1,2,3} must be send a
277 self
.scheduler_loop(2, [[svc
, 0, 'OK']], do_sleep
=True, sleep_time
=0.1)
279 self
.assert_(self
.any_log_match('SERVICE NOTIFICATION: level1.*;OK;'))
280 self
.assert_(self
.any_log_match('SERVICE NOTIFICATION: level2.*;OK;'))
281 self
.assert_(self
.any_log_match('SERVICE NOTIFICATION: level3.*;OK;'))
282 self
.show_and_clear_logs()
287 # Here we search to know if a escalation really short the notification
288 # interval if the escalation if BEFORE the next notification. For example
289 # let say we notify one a day, if the escalation if at 4hour, we need
290 # to notify at t=0, and get the next notification at 4h, and not 1day.
291 def test_time_based_escalation_with_shorting_interval(self
):
294 # critical notification
295 # run loop -> another notification
297 host
= self
.sched
.hosts
.find_by_name("test_host_0")
298 host
.checks_in_progress
= []
299 host
.act_depend_of
= [] # ignore the router
300 svc
= self
.sched
.services
.find_srv_by_name_and_hostname("test_host_0", "test_ok_0_time")
302 # To make tests quicker we make notifications send very quickly
303 # 1 day notification interval
304 svc
.notification_interval
= 1400
306 svc
.checks_in_progress
= []
307 svc
.act_depend_of
= [] # no hostchecks on critical checkresults
308 #--------------------------------------------------------------
309 # initialize host/service state
310 #--------------------------------------------------------------
311 self
.scheduler_loop(1, [[host
, 0, 'UP']], do_sleep
=True, sleep_time
=0.1)
312 print "- 1 x OK -------------------------------------"
313 self
.scheduler_loop(1, [[svc
, 0, 'OK']], do_sleep
=True, sleep_time
=0.1)
315 self
.assert_(svc
.current_notification_number
== 0)
317 # We check that we really linked our escalations :)
318 tolevel2_time
= self
.sched
.conf
.escalations
.find_by_name('ToLevel2-time')
319 self
.assert_(tolevel2_time
!= None)
320 self
.assert_(tolevel2_time
in svc
.escalations
)
321 tolevel3_time
= self
.sched
.conf
.escalations
.find_by_name('ToLevel3-time')
322 self
.assert_(tolevel3_time
!= None)
323 self
.assert_(tolevel3_time
in svc
.escalations
)
325 #--------------------------------------------------------------
326 # service reaches soft;1
327 # there must not be any notification
328 #--------------------------------------------------------------
329 print "- 1 x BAD get soft -------------------------------------"
330 self
.scheduler_loop(1, [[svc
, 2, 'BAD']], do_sleep
=True, sleep_time
=0.1)
331 # check_notification: not (soft)
332 print "---current_notification_number", svc
.current_notification_number
333 #--------------------------------------------------------------
334 # service reaches hard;2
335 # a notification must have been created
336 # notification number must be 1
337 #--------------------------------------------------------------
338 print "- 1 x BAD get hard -------------------------------------"
339 self
.scheduler_loop(1, [[svc
, 2, 'BAD']], do_sleep
=True, sleep_time
=0.1)
341 # We check if we really notify the level1
342 self
.assert_(self
.any_log_match('SERVICE NOTIFICATION: level1.*;CRITICAL;'))
343 self
.show_and_clear_logs()
346 # check_notification: yes (hard)
347 print "---current_notification_number", svc
.current_notification_number
348 # notification_number is already sent. the next one has been scheduled
349 # and is waiting for notification_interval to pass. so the current
351 self
.assert_(svc
.current_notification_number
== 1)
352 print "OK, level1 is notified, notif nb = 1"
354 print "---------------------------------1st round with a hard"
355 print "find a way to get the number of the last reaction"
356 cnn
= svc
.current_notification_number
357 print "- 1 x BAD repeat -------------------------------------"
360 # Now we go for the level2 escalation, so we will need to say : he, it's 1 hour since the begining :p
361 print "*************Next", svc
.notification_interval
* svc
.__class
__.interval_length
363 # first, we check if the next notification will really be near 1 hour because the escalation
364 # to level2 is asking for it. If it don't, the standard was 1 day!
365 for n
in svc
.notifications_in_progress
.values():
366 next
= svc
.get_next_notification_time(n
)
368 # Check if we find the next notification for the next hour,
369 # and not for the next day like we ask before
370 self
.assert_(abs(next
- now
- 3600) < 10)
372 # And we hack the notification so we can raise really the level2 escalation
373 for n
in svc
.notifications_in_progress
.values():
374 n
.t_to_go
= time
.time()
375 n
.creation_time
-= 3600
377 # We go in trouble too
378 self
.scheduler_loop(1, [[svc
, 2, 'BAD']], do_sleep
=True, sleep_time
=0.001)
380 # Now we raise the time since the begining at 1 hour, so we can escalade
381 self
.assert_(self
.any_log_match('SERVICE NOTIFICATION: level2.*;CRITICAL;'))
382 self
.show_and_clear_logs()
385 print "Level 2 got warn, now we search for level3"
386 print "cnn and cur", cnn
, svc
.current_notification_number
387 self
.assert_(svc
.current_notification_number
> cnn
)
388 cnn
= svc
.current_notification_number
390 # Now the same thing, but for level3, so one more hour
391 for n
in svc
.notifications_in_progress
.values():
392 # HOP, we say : it's already 3600 second since the last notif,
393 n
.t_to_go
= time
.time()
394 n
.creation_time
-= 3600
396 # One more bad, we say : he, it's 7200 sc of notif, so must be still level3
397 self
.scheduler_loop(1, [[svc
, 2, 'BAD']], do_sleep
=True, sleep_time
=0.1)
398 self
.assert_(self
.any_log_match('SERVICE NOTIFICATION: level3.*;CRITICAL;'))
399 self
.show_and_clear_logs()
401 for n
in svc
.notifications_in_progress
.values():
402 # we say that the next notif will be right now
403 # so we can raise a notif now
404 n
.t_to_go
= time
.time()
406 # One more, we bypass 7200, so now it's still level3
407 self
.scheduler_loop(1, [[svc
, 2, 'BAD']], do_sleep
=True, sleep_time
=0.1)
408 self
.assert_(self
.any_log_match('SERVICE NOTIFICATION: level3.*;CRITICAL;'))
409 self
.show_and_clear_logs()
412 # Now we send 10 more notif, we must be still level3
414 for n
in svc
.notifications_in_progress
.values():
415 # HOP, we say : it's already 3600 second since the last notif,
416 n
.t_to_go
= time
.time()
418 self
.scheduler_loop(1, [[svc
, 2, 'BAD']], do_sleep
=True, sleep_time
=0.1)
419 self
.assert_(self
.any_log_match('SERVICE NOTIFICATION: level3.*;CRITICAL;'))
420 self
.show_and_clear_logs()
422 # Now we recover, it will be fun because all of level{1,2,3} must be send a
424 self
.scheduler_loop(2, [[svc
, 0, 'OK']], do_sleep
=True, sleep_time
=0.1)
426 self
.assert_(self
.any_log_match('SERVICE NOTIFICATION: level1.*;OK;'))
427 self
.assert_(self
.any_log_match('SERVICE NOTIFICATION: level2.*;OK;'))
428 self
.assert_(self
.any_log_match('SERVICE NOTIFICATION: level3.*;OK;'))
429 self
.show_and_clear_logs()
433 if __name__
== '__main__':