Merge branch 'master' of ssh://lausser,shinken@shinken.git.sourceforge.net/gitroot...
[shinken.git] / test / test_npcdmod.py
blobf5a113534f5e77ca84488738c6787145c8c72faf
1 #!/usr/bin/env python2.6
2 #Copyright (C) 2009-2010 :
3 # Gabes Jean, naparuba@gmail.com
4 # Gerhard Lausser, Gerhard.Lausser@consol.de
6 #This file is part of Shinken.
8 #Shinken is free software: you can redistribute it and/or modify
9 #it under the terms of the GNU Affero General Public License as published by
10 #the Free Software Foundation, either version 3 of the License, or
11 #(at your option) any later version.
13 #Shinken is distributed in the hope that it will be useful,
14 #but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 #GNU Affero General Public License for more details.
18 #You should have received a copy of the GNU Affero General Public License
19 #along with Shinken. If not, see <http://www.gnu.org/licenses/>.
23 # This file is used to test the npcd broker module
26 import os, sys, string, time
28 from shinken_test import unittest, ShinkenTest
30 from shinken.objects.module import Module
32 from shinken.modules import npcdmod_broker
33 from shinken.modules.npcdmod_broker import Npcd_broker
36 sys.setcheckinterval(10000)
38 modconf = Module()
39 modconf.module_name = "ncpd"
40 modconf.module_type = npcdmod_broker.properties['type']
41 modconf.properties = npcdmod_broker.properties.copy()
45 class TestConfig(ShinkenTest):
48 def add(self, b):
49 self.broks[b.id] = b
52 def fake_check(self, ref, exit_status, output="OK"):
53 print "fake", ref
54 now = time.time()
55 ref.schedule()
56 check = ref.actions.pop()
57 self.sched.add(check) # check is now in sched.checks[]
58 # fake execution
59 check.check_time = now
60 check.output = output
61 check.exit_status = exit_status
62 check.execution_time = 0.001
63 check.status = 'waitconsume'
64 self.sched.waiting_results.append(check)
67 def scheduler_loop(self, count, reflist):
68 for ref in reflist:
69 (obj, exit_status, output) = ref
70 obj.checks_in_progress = []
71 for loop in range(1, count + 1):
72 print "processing check", loop
73 for ref in reflist:
74 (obj, exit_status, output) = ref
75 obj.update_in_checking()
76 self.fake_check(obj, exit_status, output)
77 self.sched.consume_results()
78 self.worker_loop()
79 for ref in reflist:
80 (obj, exit_status, output) = ref
81 obj.checks_in_progress = []
82 self.sched.update_downtimes_and_comments()
83 #time.sleep(ref.retry_interval * 60 + 1)
84 #time.sleep(60 + 1)
87 def worker_loop(self):
88 self.sched.delete_zombie_checks()
89 self.sched.delete_zombie_actions()
90 checks = self.sched.get_to_run_checks(True, False)
91 actions = self.sched.get_to_run_checks(False, True)
92 #print "------------ worker loop checks ----------------"
93 #print checks
94 #print "------------ worker loop actions ----------------"
95 #self.show_actions()
96 #print "------------ worker loop new ----------------"
97 for a in actions:
98 #print "---> fake return of action", a.id
99 a.status = 'inpoller'
100 a.exit_status = 0
101 self.sched.put_results(a)
102 #self.show_actions()
103 #print "------------ worker loop end ----------------"
106 def update_broker(self):
107 for brok in self.sched.broks.values():
108 self.npcdmod_broker.manage_brok(brok)
109 self.sched.broks = {}
112 def print_header(self):
113 print "#" * 80 + "\n" + "#" + " " * 78 + "#"
114 print "#" + string.center(self.id(), 78) + "#"
115 print "#" + " " * 78 + "#\n" + "#" * 80 + "\n"
118 def write_correct_config(self):
119 file = open("npcd.cfg", "w")
120 file.write("perfdata_file = /tmp/pfnerf")
121 file.write("perfdata_spool_dir = /tmp/pnp4shinken/var/perfdata")
122 file.write("perfdata_spool_filename=pferf")
123 file.close()
126 def write_incomplete_config(self):
127 file = open("npcd.cfg", "w")
128 file.write("perfdata_file = /tmp/pfnerf")
129 file.write("perfdata_spool_filename=pferf")
130 file.close()
133 def test_write_perfdata_file(self):
134 self.print_header()
135 if os.path.exists("./perfdata"):
136 os.unlink("./perfdata")
138 self.npcdmod_broker = Npcd_broker(modconf, None, './perfdata', '.', 'perfdata-target', 15)
139 self.npcdmod_broker.properties['to_queue'] = 0
141 self.npcdmod_broker.init()
142 self.sched.fill_initial_broks()
144 print "got initial broks"
145 now = time.time()
146 host = self.sched.hosts.find_by_name("test_host_0")
147 host.checks_in_progress = []
148 host.act_depend_of = [] # ignore the router
149 router = self.sched.hosts.find_by_name("test_router_0")
150 router.checks_in_progress = []
151 router.act_depend_of = [] # ignore the router
152 svc = self.sched.services.find_srv_by_name_and_hostname("test_host_0", "test_ok_0")
153 svc.checks_in_progress = []
154 svc.act_depend_of = [] # no hostchecks on critical checkresults
155 self.scheduler_loop(2, [[host, 0, 'UP | value1=1 value2=2'], [router, 0, 'UP | rtt=10'], [svc, 2, 'BAD | value1=0 value2=0']])
156 self.update_broker()
157 self.assert_(os.path.exists("./perfdata"))
158 if os.path.exists("./perfdata"):
159 self.npcdmod_broker.logfile.close()
160 os.unlink("./perfdata")
164 if __name__ == '__main__':
165 import cProfile
166 command = """unittest.main()"""
167 unittest.main()
168 #cProfile.runctx( command, globals(), locals(), filename="Thruk.profile" )