Add : set 'None' as the default poller_tag value, so a poller can get untaggued AND...
[shinken.git] / shinken / daemons / schedulerdaemon.py
blob0bd37aeeae3cb27973b6da91d1c8121076e7ae2e
1 #!/usr/bin/env python
2 #Copyright (C) 2009-2010 :
3 # Gabes Jean, naparuba@gmail.com
4 # Gerhard Lausser, Gerhard.Lausser@consol.de
5 # Gregory Starck, g.starck@gmail.com
6 # Hartmut Goebel, h.goebel@goebel-consult.de
8 #This file is part of Shinken.
10 #Shinken is free software: you can redistribute it and/or modify
11 #it under the terms of the GNU Affero General Public License as published by
12 #the Free Software Foundation, either version 3 of the License, or
13 #(at your option) any later version.
15 #Shinken is distributed in the hope that it will be useful,
16 #but WITHOUT ANY WARRANTY; without even the implied warranty of
17 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 #GNU Affero General Public License for more details.
20 #You should have received a copy of the GNU Affero General Public License
21 #along with Shinken. If not, see <http://www.gnu.org/licenses/>.
23 import os
24 import sys
25 import random
26 import time
28 from shinken.scheduler import Scheduler
29 from shinken.objects import Config
30 from shinken.macroresolver import MacroResolver
31 from shinken.external_command import ExternalCommandManager
32 from shinken.daemon import Daemon
33 from shinken.property import PathProp, IntegerProp
34 import shinken.pyro_wrapper as pyro
35 from shinken.log import logger
36 from shinken.satellite import BaseSatellite, IForArbiter as IArb, Interface
38 #Interface for Workers
40 class IChecks(Interface):
41 """ Interface for Workers:
42 They connect here and see if they are still OK with our running_id, if not, they must drop their checks """
44 # poller or reactionner is asking us our running_id
45 def get_running_id(self):
46 return self.running_id
49 # poller or reactionner ask us actions
50 def get_checks(self , do_checks=False, do_actions=False, poller_tags=[], worker_name='none'):
51 #print "We ask us checks"
52 res = self.app.get_to_run_checks(do_checks, do_actions, poller_tags, worker_name)
53 #print "Sending %d checks" % len(res)
54 self.app.nb_checks_send += len(res)
55 return res
57 # poller or reactionner are putting us results
58 def put_results(self, results):
59 nb_received = len(results)
60 self.app.nb_check_received += nb_received
61 print "Received %d results" % nb_received
62 self.app.waiting_results.extend(results)
64 #for c in results:
65 #self.sched.put_results(c)
66 return True
69 class IBroks(Interface):
70 """ Interface for Brokers:
71 They connect here and get all broks (data for brokers). datas must be ORDERED! (initial status BEFORE uodate...) """
73 # poller or reactionner ask us actions
74 def get_broks(self):
75 #print "We ask us broks"
76 res = self.app.get_broks()
77 #print "Sending %d broks" % len(res)#, res
78 self.app.nb_broks_send += len(res)
79 #we do not more have a full broks in queue
80 self.app.has_full_broks = False
81 return res
83 #A broker is a new one, if we do not have
84 #a full broks, we clean our broks, and
85 #fill it with all new values
86 def fill_initial_broks(self):
87 if not self.app.has_full_broks:
88 self.app.broks.clear()
89 self.app.fill_initial_broks()
92 class IForArbiter(IArb):
93 """ Interface for Arbiter, our big MASTER. We ask him a conf and after we listen for him.
94 HE got user entry, so we must listen him carefully and give information he want, maybe for another scheduler """
96 #arbiter is send us a external coomand.
97 #it can send us global command, or specific ones
98 def run_external_command(self, command):
99 self.app.sched.run_external_command(command)
101 def put_conf(self, conf):
102 self.app.sched.die()
103 super(IForArbiter, self).put_conf(conf)
105 #Call by arbiter if it thinks we are running but we must do not (like
106 #if I was a spare that take a conf but the master returns, I must die
107 #and wait a new conf)
108 #Us : No please...
109 #Arbiter : I don't care, hasta la vista baby!
110 #Us : ... <- Nothing! We are die! you don't follow
111 #anything or what??
112 def wait_new_conf(self):
113 print "Arbiter want me to wait a new conf"
114 self.app.sched.die()
115 super(IForArbiter, self).wait_new_conf()
118 # The main app class
119 class Shinken(BaseSatellite):
121 properties = BaseSatellite.properties.copy()
122 properties.update({
123 'pidfile': PathProp(default='/usr/local/shinken/var/schedulerd.pid'),
124 'port': IntegerProp(default='7768'),
125 'local_log': PathProp(default='/usr/local/shinken/var/schedulerd.log'),
129 #Create the shinken class:
130 #Create a Pyro server (port = arvg 1)
131 #then create the interface for arbiter
132 #Then, it wait for a first configuration
133 def __init__(self, config_file, is_daemon, do_replace, debug, debug_file):
135 BaseSatellite.__init__(self, 'scheduler', config_file, is_daemon, do_replace, debug, debug_file)
137 self.interface = IForArbiter(self)
138 self.sched = Scheduler(self)
140 self.ichecks = None
141 self.ibroks = None
142 self.must_run = True
144 # Now the interface
145 self.uri = None
146 self.uri2 = None
148 # And possible links for satellites
149 # from now only pollers
150 self.pollers = {}
151 self.reactionners = {}
154 def do_stop(self):
155 self.pyro_daemon.unregister(self.ibroks)
156 self.pyro_daemon.unregister(self.ichecks)
157 super(Shinken, self).do_stop()
160 def compensate_system_time_change(self, difference):
161 """ Compensate a system time change of difference for all hosts/services/checks/notifs """
162 logger.log('Warning: A system time change of %d has been detected. Compensating...' % difference)
163 # We only need to change some value
164 self.program_start = max(0, self.program_start + difference)
166 # Then we compasate all host/services
167 for h in self.sched.hosts:
168 h.compensate_system_time_change(difference)
169 for s in self.sched.services:
170 s.compensate_system_time_change(difference)
172 # Now all checks and actions
173 for c in self.sched.checks.values():
174 # Already launch checks should not be touch
175 if c.status == 'scheduled':
176 t_to_go = c.t_to_go
177 ref = c.ref
178 new_t = max(0, t_to_go + difference)
179 # But it's no so simple, we must match the timeperiod
180 new_t = ref.check_period.get_next_valid_time_from_t(new_t)
181 # But maybe no there is no more new value! Not good :(
182 # Say as error, with error output
183 if new_t is None:
184 c.state = 'waitconsume'
185 c.exit_status = 2
186 c.output = '(Error: there is no available check time after time change!)'
187 c.check_time = time.time()
188 c.execution_time = 0
189 else:
190 c.t_to_go = new_t
191 ref.next_chk = new_t
193 # Now all checks and actions
194 for c in self.sched.actions.values():
195 # Already launch checks should not be touch
196 if c.status == 'scheduled':
197 t_to_go = c.t_to_go
199 # Event handler do not have ref
200 ref = getattr(c, 'ref', None)
201 new_t = max(0, t_to_go + difference)
203 # Notification should be check with notification_period
204 if c.is_a == 'notification':
205 # But it's no so simple, we must match the timeperiod
206 new_t = ref.notification_period.get_next_valid_time_from_t(new_t)
207 # And got a creation_time variable too
208 c.creation_time = c.creation_time + difference
210 # But maybe no there is no more new value! Not good :(
211 # Say as error, with error output
212 if new_t is None:
213 c.state = 'waitconsume'
214 c.exit_status = 2
215 c.output = '(Error: there is no available check time after time change!)'
216 c.check_time = time.time()
217 c.execution_time = 0
218 else:
219 c.t_to_go = new_t
222 def manage_signal(self, sig, frame):
223 self.sched.die()
224 self.must_run = False
225 Daemon.manage_signal(self, sig, frame)
228 def do_loop_turn(self):
229 # Ok, now the conf
230 self.wait_for_initial_conf()
231 if not self.new_conf:
232 return
233 print "Ok we've got conf"
234 self.setup_new_conf()
235 print "Configuration Loaded"
236 self.sched.run()
239 def setup_new_conf(self):
240 #self.use_ssl = self.app.use_ssl
241 (conf, override_conf, modules, satellites) = self.new_conf
242 self.new_conf = None
244 if self.cur_conf and self.cur_conf.magic_hash == conf.magic_hash:
245 print("I received a conf with same hash than me, I skip it.")
246 return
248 self.conf = conf
249 self.cur_conf = conf
250 self.override_conf = override_conf
251 self.modules = modules
252 self.satellites = satellites
253 #self.pollers = self.app.pollers
255 # Now We create our pollers
256 for pol_id in satellites['pollers']:
257 # Must look if we already have it
258 already_got = pol_id in self.pollers
259 p = satellites['pollers'][pol_id]
260 self.pollers[pol_id] = p
261 uri = pyro.create_uri(p['address'], p['port'], 'Schedulers', self.use_ssl)
262 self.pollers[pol_id]['uri'] = uri
263 self.pollers[pol_id]['last_connexion'] = 0
264 print "Got a poller", p
266 #First mix conf and override_conf to have our definitive conf
267 for prop in self.override_conf:
268 print "Overriding the property %s with value %s" % (prop, self.override_conf[prop])
269 val = self.override_conf[prop]
270 setattr(self.conf, prop, val)
272 if self.conf.use_timezone != 'NOTSET':
273 print "Setting our timezone to", self.conf.use_timezone
274 os.environ['TZ'] = self.conf.use_timezone
275 time.tzset()
277 print "I've got modules", self.modules
278 # TODO: if scheduler had previous modules instanciated it must clean them !
279 self.modules_manager.set_modules(self.modules)
280 self.do_load_modules()
282 # give it an interface
283 # But first remove previous interface if exists
284 if self.ichecks is not None:
285 print "Deconnecting previous Check Interface from pyro_daemon"
286 self.pyro_daemon.unregister(self.ichecks)
287 #Now create and connect it
288 self.ichecks = IChecks(self.sched)
289 self.uri = self.pyro_daemon.register(self.ichecks, "Checks")
290 print "The Checks Interface uri is:", self.uri
292 #Same for Broks
293 if self.ibroks is not None:
294 print "Deconnecting previous Broks Interface from pyro_daemon"
295 self.pyro_daemon.unregister(self.ibroks)
296 #Create and connect it
297 self.ibroks = IBroks(self.sched)
298 self.uri2 = self.pyro_daemon.register(self.ibroks, "Broks")
299 print "The Broks Interface uri is:", self.uri2
301 print("Loading configuration..")
302 self.conf.explode_global_conf()
304 #we give sched it's conf
305 self.sched.reset()
306 self.sched.load_conf(self.conf)
307 self.sched.load_satellites(self.pollers, self.reactionners)
309 #We must update our Config dict macro with good value
310 #from the config parameters
311 self.sched.conf.fill_resource_macros_names_macros()
312 print "DBG: got macors", self.sched.conf.macros
314 #Creating the Macroresolver Class & unique instance
315 m = MacroResolver()
316 m.init(self.conf)
318 #self.conf.dump()
319 #self.conf.quick_debug()
321 #Now create the external commander
322 #it's a applyer : it role is not to dispatch commands,
323 #but to apply them
324 e = ExternalCommandManager(self.conf, 'applyer')
326 #Scheduler need to know about external command to
327 #activate it if necessery
328 self.sched.load_external_command(e)
330 #External command need the sched because he can raise checks
331 e.load_scheduler(self.sched)
334 # our main function, launch after the init
335 def main(self):
337 self.load_config_file()
339 self.do_daemon_init_and_start()
340 self.uri2 = self.pyro_daemon.register(self.interface, "ForArbiter")
341 print "The Arbiter Interface is at:", self.uri2
343 self.do_mainloop()