Merge branch 'master' of ssh://naparuba@shinken.git.sourceforge.net/gitroot/shinken...
[shinken.git] / test / test_npcdmod.py
blob2434f8d8de318636879541c1f0d8ecacf5250105
1 #!/usr/bin/env python2.6
2 #Copyright (C) 2009-2010 :
3 # Gabes Jean, naparuba@gmail.com
4 # Gerhard Lausser, Gerhard.Lausser@consol.de
6 #This file is part of Shinken.
8 #Shinken is free software: you can redistribute it and/or modify
9 #it under the terms of the GNU Affero General Public License as published by
10 #the Free Software Foundation, either version 3 of the License, or
11 #(at your option) any later version.
13 #Shinken is distributed in the hope that it will be useful,
14 #but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 #GNU Affero General Public License for more details.
18 #You should have received a copy of the GNU Affero General Public License
19 #along with Shinken. If not, see <http://www.gnu.org/licenses/>.
23 # This file is used to test the npcd broker module
26 from shinken_test import *
28 sys.path.append("../shinken/modules")
29 from npcdmod_broker import Npcd_broker
30 sys.setcheckinterval(10000)
33 class TestConfig(ShinkenTest):
37 def add(self, b):
38 self.broks[b.id] = b
41 def fake_check(self, ref, exit_status, output="OK"):
42 print "fake", ref
43 now = time.time()
44 ref.schedule()
45 check = ref.actions.pop()
46 self.sched.add(check) # check is now in sched.checks[]
47 # fake execution
48 check.check_time = now
49 check.output = output
50 check.exit_status = exit_status
51 check.execution_time = 0.001
52 check.status = 'waitconsume'
53 self.sched.waiting_results.append(check)
56 def scheduler_loop(self, count, reflist):
57 for ref in reflist:
58 (obj, exit_status, output) = ref
59 obj.checks_in_progress = []
60 for loop in range(1, count + 1):
61 print "processing check", loop
62 for ref in reflist:
63 (obj, exit_status, output) = ref
64 obj.update_in_checking()
65 self.fake_check(obj, exit_status, output)
66 self.sched.consume_results()
67 self.worker_loop()
68 for ref in reflist:
69 (obj, exit_status, output) = ref
70 obj.checks_in_progress = []
71 self.sched.update_downtimes_and_comments()
72 #time.sleep(ref.retry_interval * 60 + 1)
73 #time.sleep(60 + 1)
76 def worker_loop(self):
77 self.sched.delete_zombie_checks()
78 self.sched.delete_zombie_actions()
79 checks = self.sched.get_to_run_checks(True, False)
80 actions = self.sched.get_to_run_checks(False, True)
81 #print "------------ worker loop checks ----------------"
82 #print checks
83 #print "------------ worker loop actions ----------------"
84 #self.show_actions()
85 #print "------------ worker loop new ----------------"
86 for a in actions:
87 #print "---> fake return of action", a.id
88 a.status = 'inpoller'
89 a.exit_status = 0
90 self.sched.put_results(a)
91 #self.show_actions()
92 #print "------------ worker loop end ----------------"
95 def update_broker(self):
96 for brok in self.sched.broks.values():
97 self.npcdmod_broker.manage_brok(brok)
98 self.sched.broks = {}
101 def print_header(self):
102 print "#" * 80 + "\n" + "#" + " " * 78 + "#"
103 print "#" + string.center(self.id(), 78) + "#"
104 print "#" + " " * 78 + "#\n" + "#" * 80 + "\n"
107 def write_correct_config(self):
108 file = open("npcd.cfg", "w")
109 file.write("perfdata_file = /tmp/pfnerf")
110 file.write("perfdata_spool_dir = /tmp/pnp4shinken/var/perfdata")
111 file.write("perfdata_spool_filename=pferf")
112 flie.close()
115 def write_incomplete_config(self):
116 file = open("npcd.cfg", "w")
117 file.write("perfdata_file = /tmp/pfnerf")
118 file.write("perfdata_spool_filename=pferf")
119 flie.close()
122 def test_write_perfdata_file(self):
123 self.print_header()
124 if os.path.exists("./perfdata"):
125 os.unlink("./perfdata")
127 self.npcdmod_broker = Npcd_broker('npcd', None, './perfdata', '.', 'perfdata-target', 15)
128 self.npcdmod_broker.properties = {
129 'to_queue' : 0
131 self.npcdmod_broker.init()
132 self.sched.fill_initial_broks()
134 print "got initial broks"
135 now = time.time()
136 host = self.sched.hosts.find_by_name("test_host_0")
137 host.checks_in_progress = []
138 host.act_depend_of = [] # ignore the router
139 router = self.sched.hosts.find_by_name("test_router_0")
140 router.checks_in_progress = []
141 router.act_depend_of = [] # ignore the router
142 svc = self.sched.services.find_srv_by_name_and_hostname("test_host_0", "test_ok_0")
143 svc.checks_in_progress = []
144 svc.act_depend_of = [] # no hostchecks on critical checkresults
145 self.scheduler_loop(2, [[host, 0, 'UP | value1=1 value2=2'], [router, 0, 'UP | rtt=10'], [svc, 2, 'BAD | value1=0 value2=0']])
146 self.update_broker()
147 self.assert_(os.path.exists("./perfdata"))
148 if os.path.exists("./perfdata"):
149 os.unlink("./perfdata")
153 if __name__ == '__main__':
154 import cProfile
155 command = """unittest.main()"""
156 unittest.main()
157 #cProfile.runctx( command, globals(), locals(), filename="Thruk.profile" )