1 #!/usr/bin/env python2.6
2 #Copyright (C) 2009-2010 :
3 # Gabes Jean, naparuba@gmail.com
4 # Gerhard Lausser, Gerhard.Lausser@consol.de
6 #This file is part of Shinken.
8 #Shinken is free software: you can redistribute it and/or modify
9 #it under the terms of the GNU Affero General Public License as published by
10 #the Free Software Foundation, either version 3 of the License, or
11 #(at your option) any later version.
13 #Shinken is distributed in the hope that it will be useful,
14 #but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 #GNU Affero General Public License for more details.
18 #You should have received a copy of the GNU Affero General Public License
19 #along with Shinken. If not, see <http://www.gnu.org/licenses/>.
23 # This file is used to test host- and service-downtimes.
28 from shinken_test
import unittest
, ShinkenTest
31 class TestConfig(ShinkenTest
):
33 def test_continuous_notifications(self
):
36 # critical notification
37 # run loop -> another notification
39 host
= self
.sched
.hosts
.find_by_name("test_host_0")
40 host
.checks_in_progress
= []
41 host
.act_depend_of
= [] # ignore the router
42 svc
= self
.sched
.services
.find_srv_by_name_and_hostname("test_host_0", "test_ok_0")
44 #To make tests quicker we make notifications send very quickly
45 svc
.notification_interval
= 0.001
47 svc
.checks_in_progress
= []
48 svc
.act_depend_of
= [] # no hostchecks on critical checkresults
49 #--------------------------------------------------------------
50 # initialize host/service state
51 #--------------------------------------------------------------
52 self
.scheduler_loop(1, [[host
, 0, 'UP']], do_sleep
=True, sleep_time
=0.1)
53 print "- 1 x OK -------------------------------------"
54 self
.scheduler_loop(1, [[svc
, 0, 'OK']], do_sleep
=True, sleep_time
=0.1)
56 self
.assert_(svc
.current_notification_number
== 0)
57 #--------------------------------------------------------------
58 # service reaches soft;1
59 # there must not be any notification
60 #--------------------------------------------------------------
61 print "- 1 x BAD get soft -------------------------------------"
62 self
.scheduler_loop(1, [[svc
, 2, 'BAD']], do_sleep
=True, sleep_time
=0.1)
63 # check_notification: not (soft)
64 print "---current_notification_number", svc
.current_notification_number
65 #--------------------------------------------------------------
66 # service reaches hard;2
67 # a notification must have been created
68 # notification number must be 1
69 #--------------------------------------------------------------
70 print "- 1 x BAD get hard -------------------------------------"
71 self
.scheduler_loop(1, [[svc
, 2, 'BAD']], do_sleep
=True, sleep_time
=0.1)
72 self
.show_and_clear_logs()
73 #self.show_and_clear_actions()
75 print svc
.notifications_in_progress
76 for n
in svc
.notifications_in_progress
.values():
78 # check_notification: yes (hard)
79 print "---current_notification_number", svc
.current_notification_number
80 # notification_number is already sent. the next one has been scheduled
81 # and is waiting for notification_interval to pass. so the current
83 self
.assert_(svc
.current_notification_number
== 1)
84 print "---------------------------------1st round with a hard"
85 print "find a way to get the number of the last reaction"
86 cnn
= svc
.current_notification_number
87 print "- 5 x BAD repeat -------------------------------------"
88 self
.scheduler_loop(1, [[svc
, 2, 'BAD']], do_sleep
=True, sleep_time
=0.1)
89 self
.show_and_clear_logs()
91 print "cnn and cur", cnn
, svc
.current_notification_number
92 self
.assert_(svc
.current_notification_number
> cnn
)
93 cnn
= svc
.current_notification_number
94 self
.scheduler_loop(1, [[svc
, 2, 'BAD']], do_sleep
=True, sleep_time
=0.1)
95 self
.show_and_clear_logs()
97 print "svc.current_notification_number, cnn", svc
.current_notification_number
, cnn
98 self
.assert_(svc
.current_notification_number
> cnn
)
99 #--------------------------------------------------------------
100 # 2 cycles = 2 minutes = 2 new notifications
101 #--------------------------------------------------------------
102 cnn
= svc
.current_notification_number
103 self
.scheduler_loop(2, [[svc
, 2, 'BAD']], do_sleep
=True, sleep_time
=0.1)
104 self
.show_and_clear_logs()
106 print "svc.current_notification_number, cnn", svc
.current_notification_number
, cnn
107 self
.assert_(svc
.current_notification_number
> cnn
)
108 #--------------------------------------------------------------
109 # 2 cycles = 2 minutes = 2 new notifications (theoretically)
110 # BUT: test_contact filters notifications
111 # we do not raise current_notification_number if no mail was sent
112 #--------------------------------------------------------------
114 cmd
= "[%lu] DISABLE_CONTACT_SVC_NOTIFICATIONS;test_contact" % now
115 self
.sched
.run_external_command(cmd
)
116 cnn
= svc
.current_notification_number
117 self
.scheduler_loop(1, [[svc
, 2, 'BAD']], do_sleep
=True, sleep_time
=0.1)
118 self
.show_and_clear_logs()
120 self
.assert_(svc
.current_notification_number
== cnn
)
121 #--------------------------------------------------------------
122 # again a normal cycle
123 # test_contact receives his mail
124 #--------------------------------------------------------------
126 cmd
= "[%lu] ENABLE_CONTACT_SVC_NOTIFICATIONS;test_contact" % now
127 self
.sched
.run_external_command(cmd
)
128 #cnn = svc.current_notification_number
129 self
.scheduler_loop(1, [[svc
, 2, 'BAD']], do_sleep
=True, sleep_time
=0.1)
130 self
.show_and_clear_logs()
132 print "svc.current_notification_number, cnn", svc
.current_notification_number
, cnn
133 self
.assert_(svc
.current_notification_number
== cnn
+ 1)
134 #--------------------------------------------------------------
135 # now recover. there must be no scheduled/inpoller notification
136 #--------------------------------------------------------------
137 self
.scheduler_loop(1, [[svc
, 0, 'GOOD']], do_sleep
=True, sleep_time
=0.1)
138 self
.show_and_clear_logs()
139 self
.show_and_clear_actions()
140 self
.assert_(svc
.current_notification_number
== 0)
143 def test_continuous_notifications_delayed(self
):
146 # critical notification
147 # run loop -> another notification
149 host
= self
.sched
.hosts
.find_by_name("test_host_0")
150 host
.checks_in_progress
= []
151 host
.act_depend_of
= [] # ignore the router
152 svc
= self
.sched
.services
.find_srv_by_name_and_hostname("test_host_0", "test_ok_0")
154 #To make tests quicker we make notifications send very quickly
155 svc
.notification_interval
= 0.001 #and send imediatly then
157 svc
.first_notification_delay
= 0.1 # set 6s for first notif delay
158 svc
.checks_in_progress
= []
159 svc
.act_depend_of
= [] # no hostchecks on critical checkresults
160 self
.scheduler_loop(1, [[host
, 0, 'UP']], do_sleep
=True, sleep_time
=1)
161 #-----------------------------------------------------------------
162 # initialize with a good check. there must be no pending notification
163 #-----------------------------------------------------------------
164 self
.scheduler_loop(1, [[svc
, 0, 'OK']], do_sleep
=True, sleep_time
=1)
165 self
.show_and_clear_logs()
166 self
.show_and_clear_actions()
167 self
.assert_(svc
.current_notification_number
== 0)
168 #-----------------------------------------------------------------
169 # check fails and enters soft state.
170 # there must be no notification, only the event handler
171 #-----------------------------------------------------------------
172 self
.scheduler_loop(1, [[svc
, 1, 'BAD']], do_sleep
=True, sleep_time
=1)
173 self
.assert_(self
.count_actions() == 1)
175 print svc
.last_time_warning
, svc
.last_time_critical
, svc
.last_time_unknown
, svc
.last_time_ok
176 last_time_not_ok
= svc
.last_time_non_ok_or_up()
177 deadline
= svc
.last_time_non_ok_or_up() + svc
.first_notification_delay
* svc
.__class
__.interval_length
178 #-----------------------------------------------------------------
179 # check fails again and enters hard state.
180 # now there is a (scheduled for later) notification and an event handler
181 # current_notification_number is still 0, until notifications
182 # have actually been sent
183 #-----------------------------------------------------------------
184 self
.scheduler_loop(1, [[svc
, 2, 'BAD']], do_sleep
=True, sleep_time
=0.1)
185 self
.show_and_clear_logs()
187 self
.assert_(svc
.current_notification_number
== 0)
188 #-----------------------------------------------------------------
189 # repeat bad checks during the delay time
190 # there is 1 action which is the scheduled notification
191 #-----------------------------------------------------------------
193 while deadline
> time
.time():
195 self
.scheduler_loop(1, [[svc
, 2, 'BAD']], do_sleep
=True, sleep_time
=0.1)
196 self
.show_and_clear_logs()
198 print deadline
-time
.time()
199 ###self.assert_(self.count_actions() == 1)
200 #-----------------------------------------------------------------
201 # now the delay period is over and the notification can be sent
202 # with the next bad check
203 # there is 1 action, the notification (
204 # 1 notification was sent, so current_notification_number is 1
205 #-----------------------------------------------------------------
206 self
.scheduler_loop(1, [[svc
, 2, 'BAD']], do_sleep
=True, sleep_time
=1)
207 print "Counted actions", self
.count_actions()
208 self
.assert_(self
.count_actions() == 2)
210 self
.assert_(svc
.current_notification_number
== 1)
212 self
.assert_(len(svc
.notifications_in_progress
) == 1) # master is zombieand removed_from_in_progress
214 self
.assert_(self
.log_match(1, 'SERVICE NOTIFICATION.*;CRITICAL;'))
215 self
.show_and_clear_logs()
217 #-----------------------------------------------------------------
218 # relax with a successful check
219 # there are 2 actions, one notification and one eventhandler
220 # current_notification_number was reset to 0
221 #-----------------------------------------------------------------
222 self
.scheduler_loop(2, [[svc
, 0, 'GOOD']], do_sleep
=True, sleep_time
=1)
223 self
.assert_(self
.log_match(1, 'SERVICE ALERT.*;OK;'))
224 self
.assert_(self
.log_match(2, 'SERVICE EVENT HANDLER.*;OK;'))
225 self
.assert_(self
.log_match(3, 'SERVICE NOTIFICATION.*;OK;'))
227 self
.assert_(svc
.current_notification_number
== 0)
228 self
.assert_(len(svc
.notifications_in_progress
) == 0)
229 self
.assert_(len(svc
.notified_contacts
) == 0)
230 #self.assert_(self.count_actions() == 2)
231 self
.show_and_clear_logs()
232 self
.show_and_clear_actions()
235 def test_continuous_notifications_delayed_recovers_fast(self
):
238 # critical notification
239 # run loop -> another notification
241 host
= self
.sched
.hosts
.find_by_name("test_host_0")
242 host
.checks_in_progress
= []
243 host
.act_depend_of
= [] # ignore the router
244 svc
= self
.sched
.services
.find_srv_by_name_and_hostname("test_host_0", "test_ok_0")
245 svc
.first_notification_delay
= 5
246 svc
.checks_in_progress
= []
247 svc
.act_depend_of
= [] # no hostchecks on critical checkresults
248 self
.scheduler_loop(1, [[host
, 0, 'UP']], do_sleep
=True, sleep_time
=0.1)
249 #-----------------------------------------------------------------
250 # initialize with a good check. there must be no pending notification
251 #-----------------------------------------------------------------
252 self
.scheduler_loop(1, [[svc
, 0, 'OK']], do_sleep
=True, sleep_time
=0.1)
253 self
.show_and_clear_logs()
254 self
.show_and_clear_actions()
255 self
.assert_(svc
.current_notification_number
== 0)
256 #-----------------------------------------------------------------
257 # check fails and enters soft state.
258 # there must be no notification, only the event handler
259 #-----------------------------------------------------------------
260 self
.scheduler_loop(1, [[svc
, 1, 'BAD']], do_sleep
=True, sleep_time
=0.1)
261 self
.show_and_clear_logs()
263 self
.assert_(self
.count_actions() == 1)
264 #-----------------------------------------------------------------
265 # check fails again and enters hard state.
266 # now there is a (scheduled for later) notification and an event handler
267 # current_notification_number is still 0 (will be raised when
268 # a notification is actually sent)
269 #-----------------------------------------------------------------
270 self
.scheduler_loop(1, [[svc
, 2, 'BAD']], do_sleep
=True, sleep_time
=0.1)
271 self
.show_and_clear_logs()
273 self
.assert_(self
.count_actions() == 2)
274 self
.assert_(svc
.current_notification_number
== 0)
275 #-----------------------------------------------------------------
276 # repeat bad checks during the delay time
277 # but only one time. we don't want to reach the deadline
278 # there is one action: the pending notification
279 #-----------------------------------------------------------------
280 self
.scheduler_loop(1, [[svc
, 2, 'BAD']], do_sleep
=True, sleep_time
=0.1)
281 self
.show_and_clear_logs()
283 self
.assert_(self
.count_actions() == 1)
284 #-----------------------------------------------------------------
285 # relax with a successful check
286 # there is 1 action, the eventhandler.
287 # there is a second action: the master recover notification
288 # but it becomes a zombie very soon, because it has no effect
289 #-----------------------------------------------------------------
290 self
.scheduler_loop(1, [[svc
, 0, 'GOOD']], do_sleep
=True, sleep_time
=0.1)
291 self
.assert_(self
.log_match(1, 'SERVICE ALERT.*;OK;'))
292 self
.assert_(self
.log_match(2, 'SERVICE EVENT HANDLER.*;OK;'))
293 self
.assert_(not self
.log_match(3, 'SERVICE NOTIFICATION.*;OK;'))
295 self
.assert_(len(svc
.notifications_in_progress
) == 0)
296 self
.assert_(len(svc
.notified_contacts
) == 0)
297 self
.assert_(self
.count_actions() == 2)
298 self
.show_and_clear_logs()
299 self
.show_and_clear_actions()
302 def test_host_in_downtime_or_down_service_critical(self
):
305 # critical notification
306 # run loop -> another notification
308 host
= self
.sched
.hosts
.find_by_name("test_host_0")
309 host
.checks_in_progress
= []
310 host
.act_depend_of
= [] # ignore the router
311 svc
= self
.sched
.services
.find_srv_by_name_and_hostname("test_host_0", "test_ok_0")
313 #To make tests quicker we make notifications send very quickly
314 svc
.notification_interval
= 0.001
316 svc
.checks_in_progress
= []
317 svc
.act_depend_of
= [] # no hostchecks on critical checkresults
318 #--------------------------------------------------------------
319 # initialize host/service state
320 #--------------------------------------------------------------
321 self
.scheduler_loop(1, [[host
, 0, 'UP'], [svc
, 0, 'OK']], do_sleep
=True, sleep_time
=0.1)
322 self
.assert_(svc
.current_notification_number
== 0)
323 #--------------------------------------------------------------
324 # service reaches hard;2
325 # a notification must have been created
326 # notification number must be 1
327 #--------------------------------------------------------------
328 self
.scheduler_loop(2, [[host
, 0, 'UP'], [svc
, 2, 'BAD']], do_sleep
=True, sleep_time
=0.1)
331 self
.assert_(self
.log_match(1, 'SERVICE ALERT.*;CRITICAL;SOFT'))
332 self
.assert_(self
.log_match(2, 'SERVICE EVENT HANDLER.*;CRITICAL;SOFT'))
333 self
.assert_(self
.log_match(3, 'SERVICE ALERT.*;CRITICAL;HARD'))
334 self
.assert_(self
.log_match(4, 'SERVICE EVENT HANDLER.*;CRITICAL;HARD'))
335 self
.assert_(self
.log_match(5, 'SERVICE NOTIFICATION.*;CRITICAL;'))
336 self
.assert_(svc
.current_notification_number
== 1)
339 #--------------------------------------------------------------
340 # reset host/service state
341 #--------------------------------------------------------------
342 self
.scheduler_loop(1, [[host
, 0, 'UP'], [svc
, 0, 'OK']], do_sleep
=True, sleep_time
=0.1)
343 self
.assert_(svc
.current_notification_number
== 0)
346 # fixed downtime valid for the next 5 minutes
347 cmd
= "[%lu] SCHEDULE_HOST_DOWNTIME;test_host_0;%d;%d;1;0;%d;lausser;blablub" % (now
, now
, now
+ duration
, duration
)
348 self
.sched
.run_external_command(cmd
)
349 #--------------------------------------------------------------
350 # service reaches hard;2
352 #--------------------------------------------------------------
353 self
.scheduler_loop(2, [[host
, 0, 'UP'], [svc
, 2, 'BAD']], do_sleep
=True, sleep_time
=0.1)
354 self
.assert_(self
.any_log_match('HOST NOTIFICATION.*;DOWNTIMESTART'))
355 self
.assert_(not self
.any_log_match('SERVICE NOTIFICATION.*;CRITICAL;'))
356 self
.show_and_clear_logs()
357 self
.show_and_clear_actions()
360 def test_only_notified_contacts_notifications(self
):
363 # critical notification
364 # run loop -> another notification
366 host
= self
.sched
.hosts
.find_by_name("test_host_0")
367 host
.checks_in_progress
= []
368 host
.act_depend_of
= [] # ignore the router
369 svc
= self
.sched
.services
.find_srv_by_name_and_hostname("test_host_0", "test_ok_0")
371 #To make tests quicker we make notifications send very quickly
372 svc
.notification_interval
= 0.001
374 svc
.checks_in_progress
= []
375 svc
.act_depend_of
= [] # no hostchecks on critical checkresults
377 #We want the contact to do not have a mail, so we remove tyhe 'u'
378 test_contact
= self
.sched
.contacts
.find_by_name('test_contact')
379 for nw
in test_contact
.notificationways
:
380 nw
.service_notification_options
.remove('u')
382 #--------------------------------------------------------------
383 # initialize host/service state
384 #--------------------------------------------------------------
385 self
.scheduler_loop(1, [[host
, 0, 'UP']], do_sleep
=True, sleep_time
=0.1)
386 print "- 1 x OK -------------------------------------"
387 self
.scheduler_loop(1, [[svc
, 0, 'OK']], do_sleep
=True, sleep_time
=0.1)
389 self
.assert_(svc
.current_notification_number
== 0)
390 #--------------------------------------------------------------
391 # service reaches soft;1
392 # there must not be any notification
393 #--------------------------------------------------------------
394 print "- 1 x BAD get soft -------------------------------------"
395 self
.scheduler_loop(1, [[svc
, 3, 'UNKNOWN']], do_sleep
=True, sleep_time
=0.1)
396 # check_notification: not (soft)
397 print "---current_notification_number", svc
.current_notification_number
398 print "Contact we notified", svc
.notified_contacts
399 #--------------------------------------------------------------
400 # service reaches hard;2
401 # a notification must have been created
402 # notification number must be 1
403 #--------------------------------------------------------------
404 print "- 1 x BAD get hard -------------------------------------"
405 self
.scheduler_loop(1, [[svc
, 3, 'UNKNOWN']], do_sleep
=True, sleep_time
=0.1)
406 self
.show_and_clear_logs()
407 #self.show_and_clear_actions()
410 print "notif in progress", svc
.notifications_in_progress
411 for n
in svc
.notifications_in_progress
.values():
412 print "TOTO", n
.__dict
__
413 # check_notification: yes (hard)
414 print "---current_notification_number", svc
.current_notification_number
415 # The contact refuse our notification, so we are still at 0
416 self
.assert_(svc
.current_notification_number
== 0)
417 print "---------------------------------1st round with a hard"
418 print "find a way to get the number of the last reaction"
419 cnn
= svc
.current_notification_number
420 print "- 5 x BAD repeat -------------------------------------"
421 self
.scheduler_loop(1, [[svc
, 3, 'BAD']], do_sleep
=True, sleep_time
=0.1)
422 self
.show_and_clear_logs()
424 print "cnn and cur", cnn
, svc
.current_notification_number
426 cnn
= svc
.current_notification_number
427 self
.scheduler_loop(1, [[svc
, 3, 'BAD']], do_sleep
=True, sleep_time
=0.1)
428 self
.show_and_clear_logs()
430 print "svc.current_notification_number, cnn", svc
.current_notification_number
, cnn
432 #--------------------------------------------------------------
433 # 2 cycles = 2 minutes = 2 new notifications
434 #--------------------------------------------------------------
435 cnn
= svc
.current_notification_number
436 self
.scheduler_loop(2, [[svc
, 3, 'BAD']], do_sleep
=True, sleep_time
=0.1)
437 self
.show_and_clear_logs()
439 print "svc.current_notification_number, cnn", svc
.current_notification_number
, cnn
441 #--------------------------------------------------------------
442 # 2 cycles = 2 minutes = 2 new notifications (theoretically)
443 # BUT: test_contact filters notifications
444 # we do not raise current_notification_number if no mail was sent
445 #--------------------------------------------------------------
447 cmd
= "[%lu] DISABLE_CONTACT_SVC_NOTIFICATIONS;test_contact" % now
448 self
.sched
.run_external_command(cmd
)
449 cnn
= svc
.current_notification_number
450 self
.scheduler_loop(1, [[svc
, 3, 'BAD']], do_sleep
=True, sleep_time
=0.1)
451 self
.show_and_clear_logs()
453 self
.assert_(svc
.current_notification_number
== cnn
)
454 #--------------------------------------------------------------
455 # again a normal cycle
456 # test_contact receives his mail
457 #--------------------------------------------------------------
459 cmd
= "[%lu] ENABLE_CONTACT_SVC_NOTIFICATIONS;test_contact" % now
460 self
.sched
.run_external_command(cmd
)
461 #cnn = svc.current_notification_number
462 self
.scheduler_loop(1, [[svc
, 3, 'BAD']], do_sleep
=True, sleep_time
=0.1)
463 self
.show_and_clear_logs()
465 print "svc.current_notification_number, cnn", svc
.current_notification_number
, cnn
466 #self.assert_(svc.current_notification_number == cnn + 1)
467 #--------------------------------------------------------------
468 # now recover. there must be no scheduled/inpoller notification
469 #--------------------------------------------------------------
470 self
.scheduler_loop(1, [[svc
, 0, 'GOOD']], do_sleep
=True, sleep_time
=0.1)
473 # I do not want a notification of a recovery because
474 # the user did not have the notif first!
475 self
.assert_(not self
.any_log_match('notify-service'))
476 self
.show_and_clear_logs()
477 self
.show_and_clear_actions()
478 self
.assert_(svc
.current_notification_number
== 0)
482 if __name__
== '__main__':