2 #Copyright (C) 2009-2010 :
3 # Gabes Jean, naparuba@gmail.com
4 # Gerhard Lausser, Gerhard.Lausser@consol.de
5 # Gregory Starck, g.starck@gmail.com
6 # Hartmut Goebel, h.goebel@goebel-consult.de
8 #This file is part of Shinken.
10 #Shinken is free software: you can redistribute it and/or modify
11 #it under the terms of the GNU Affero General Public License as published by
12 #the Free Software Foundation, either version 3 of the License, or
13 #(at your option) any later version.
15 #Shinken is distributed in the hope that it will be useful,
16 #but WITHOUT ANY WARRANTY; without even the implied warranty of
17 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 #GNU Affero General Public License for more details.
20 #You should have received a copy of the GNU Affero General Public License
21 #along with Shinken. If not, see <http://www.gnu.org/licenses/>.
28 from shinken
.scheduler
import Scheduler
29 from shinken
.objects
import Config
30 from shinken
.macroresolver
import MacroResolver
31 from shinken
.external_command
import ExternalCommandManager
32 from shinken
.daemon
import Daemon
33 from shinken
.property import PathProp
, IntegerProp
34 import shinken
.pyro_wrapper
as pyro
35 from shinken
.log
import logger
36 from shinken
.satellite
import BaseSatellite
, IForArbiter
as IArb
, Interface
38 #Interface for Workers
40 class IChecks(Interface
):
41 """ Interface for Workers:
42 They connect here and see if they are still OK with our running_id, if not, they must drop their checks """
44 # poller or reactionner is asking us our running_id
45 def get_running_id(self
):
46 return self
.running_id
49 # poller or reactionner ask us actions
50 def get_checks(self
, do_checks
=False, do_actions
=False, poller_tags
=[], worker_name
='none'):
51 #print "We ask us checks"
52 res
= self
.app
.get_to_run_checks(do_checks
, do_actions
, poller_tags
, worker_name
)
53 #print "Sending %d checks" % len(res)
54 self
.app
.nb_checks_send
+= len(res
)
57 # poller or reactionner are putting us results
58 def put_results(self
, results
):
59 nb_received
= len(results
)
60 self
.app
.nb_check_received
+= nb_received
61 print "Received %d results" % nb_received
62 self
.app
.waiting_results
.extend(results
)
65 #self.sched.put_results(c)
69 class IBroks(Interface
):
70 """ Interface for Brokers:
71 They connect here and get all broks (data for brokers). datas must be ORDERED! (initial status BEFORE uodate...) """
73 # poller or reactionner ask us actions
75 #print "We ask us broks"
76 res
= self
.app
.get_broks()
77 #print "Sending %d broks" % len(res)#, res
78 self
.app
.nb_broks_send
+= len(res
)
79 #we do not more have a full broks in queue
80 self
.app
.has_full_broks
= False
83 #A broker is a new one, if we do not have
84 #a full broks, we clean our broks, and
85 #fill it with all new values
86 def fill_initial_broks(self
):
87 if not self
.app
.has_full_broks
:
88 self
.app
.broks
.clear()
89 self
.app
.fill_initial_broks()
92 class IForArbiter(IArb
):
93 """ Interface for Arbiter, our big MASTER. We ask him a conf and after we listen for him.
94 HE got user entry, so we must listen him carefully and give information he want, maybe for another scheduler """
96 #arbiter is send us a external coomand.
97 #it can send us global command, or specific ones
98 def run_external_command(self
, command
):
99 self
.app
.sched
.run_external_command(command
)
101 def put_conf(self
, conf
):
103 super(IForArbiter
, self
).put_conf(conf
)
105 #Call by arbiter if it thinks we are running but we must do not (like
106 #if I was a spare that take a conf but the master returns, I must die
107 #and wait a new conf)
109 #Arbiter : I don't care, hasta la vista baby!
110 #Us : ... <- Nothing! We are die! you don't follow
112 def wait_new_conf(self
):
113 print "Arbiter want me to wait a new conf"
115 super(IForArbiter
, self
).wait_new_conf()
119 class Shinken(BaseSatellite
):
121 properties
= BaseSatellite
.properties
.copy()
123 'pidfile': PathProp(default
='/usr/local/shinken/var/schedulerd.pid'),
124 'port': IntegerProp(default
='7768'),
125 'local_log': PathProp(default
='/usr/local/shinken/var/schedulerd.log'),
129 #Create the shinken class:
130 #Create a Pyro server (port = arvg 1)
131 #then create the interface for arbiter
132 #Then, it wait for a first configuration
133 def __init__(self
, config_file
, is_daemon
, do_replace
, debug
, debug_file
):
135 BaseSatellite
.__init
__(self
, 'scheduler', config_file
, is_daemon
, do_replace
, debug
, debug_file
)
137 self
.interface
= IForArbiter(self
)
138 self
.sched
= Scheduler(self
)
148 # And possible links for satellites
149 # from now only pollers
151 self
.reactionners
= {}
155 self
.pyro_daemon
.unregister(self
.ibroks
)
156 self
.pyro_daemon
.unregister(self
.ichecks
)
157 super(Shinken
, self
).do_stop()
160 def compensate_system_time_change(self
, difference
):
161 """ Compensate a system time change of difference for all hosts/services/checks/notifs """
162 logger
.log('Warning: A system time change of %d has been detected. Compensating...' % difference
)
163 # We only need to change some value
164 self
.program_start
= max(0, self
.program_start
+ difference
)
166 # Then we compasate all host/services
167 for h
in self
.sched
.hosts
:
168 h
.compensate_system_time_change(difference
)
169 for s
in self
.sched
.services
:
170 s
.compensate_system_time_change(difference
)
172 # Now all checks and actions
173 for c
in self
.sched
.checks
.values():
174 # Already launch checks should not be touch
175 if c
.status
== 'scheduled':
178 new_t
= max(0, t_to_go
+ difference
)
179 # But it's no so simple, we must match the timeperiod
180 new_t
= ref
.check_period
.get_next_valid_time_from_t(new_t
)
181 # But maybe no there is no more new value! Not good :(
182 # Say as error, with error output
184 c
.state
= 'waitconsume'
186 c
.output
= '(Error: there is no available check time after time change!)'
187 c
.check_time
= time
.time()
193 # Now all checks and actions
194 for c
in self
.sched
.actions
.values():
195 # Already launch checks should not be touch
196 if c
.status
== 'scheduled':
199 # Event handler do not have ref
200 ref
= getattr(c
, 'ref', None)
201 new_t
= max(0, t_to_go
+ difference
)
203 # Notification should be check with notification_period
204 if c
.is_a
== 'notification':
205 # But it's no so simple, we must match the timeperiod
206 new_t
= ref
.notification_period
.get_next_valid_time_from_t(new_t
)
207 # And got a creation_time variable too
208 c
.creation_time
= c
.creation_time
+ difference
210 # But maybe no there is no more new value! Not good :(
211 # Say as error, with error output
213 c
.state
= 'waitconsume'
215 c
.output
= '(Error: there is no available check time after time change!)'
216 c
.check_time
= time
.time()
222 def manage_signal(self
, sig
, frame
):
224 self
.must_run
= False
225 Daemon
.manage_signal(self
, sig
, frame
)
228 def do_loop_turn(self
):
230 self
.wait_for_initial_conf()
231 if not self
.new_conf
:
233 print "Ok we've got conf"
234 self
.setup_new_conf()
235 print "Configuration Loaded"
239 def setup_new_conf(self
):
240 #self.use_ssl = self.app.use_ssl
241 (conf
, override_conf
, modules
, satellites
) = self
.new_conf
244 if self
.cur_conf
and self
.cur_conf
.magic_hash
== conf
.magic_hash
:
245 print("I received a conf with same hash than me, I skip it.")
250 self
.override_conf
= override_conf
251 self
.modules
= modules
252 self
.satellites
= satellites
253 #self.pollers = self.app.pollers
255 # Now We create our pollers
256 for pol_id
in satellites
['pollers']:
257 # Must look if we already have it
258 already_got
= pol_id
in self
.pollers
259 p
= satellites
['pollers'][pol_id
]
260 self
.pollers
[pol_id
] = p
261 uri
= pyro
.create_uri(p
['address'], p
['port'], 'Schedulers', self
.use_ssl
)
262 self
.pollers
[pol_id
]['uri'] = uri
263 self
.pollers
[pol_id
]['last_connexion'] = 0
264 print "Got a poller", p
266 #First mix conf and override_conf to have our definitive conf
267 for prop
in self
.override_conf
:
268 print "Overriding the property %s with value %s" % (prop
, self
.override_conf
[prop
])
269 val
= self
.override_conf
[prop
]
270 setattr(self
.conf
, prop
, val
)
272 if self
.conf
.use_timezone
!= 'NOTSET':
273 print "Setting our timezone to", self
.conf
.use_timezone
274 os
.environ
['TZ'] = self
.conf
.use_timezone
277 print "I've got modules", self
.modules
278 # TODO: if scheduler had previous modules instanciated it must clean them !
279 self
.modules_manager
.set_modules(self
.modules
)
280 self
.do_load_modules()
282 # give it an interface
283 # But first remove previous interface if exists
284 if self
.ichecks
is not None:
285 print "Deconnecting previous Check Interface from pyro_daemon"
286 self
.pyro_daemon
.unregister(self
.ichecks
)
287 #Now create and connect it
288 self
.ichecks
= IChecks(self
.sched
)
289 self
.uri
= self
.pyro_daemon
.register(self
.ichecks
, "Checks")
290 print "The Checks Interface uri is:", self
.uri
293 if self
.ibroks
is not None:
294 print "Deconnecting previous Broks Interface from pyro_daemon"
295 self
.pyro_daemon
.unregister(self
.ibroks
)
296 #Create and connect it
297 self
.ibroks
= IBroks(self
.sched
)
298 self
.uri2
= self
.pyro_daemon
.register(self
.ibroks
, "Broks")
299 print "The Broks Interface uri is:", self
.uri2
301 print("Loading configuration..")
302 self
.conf
.explode_global_conf()
304 #we give sched it's conf
306 self
.sched
.load_conf(self
.conf
)
307 self
.sched
.load_satellites(self
.pollers
, self
.reactionners
)
309 #We must update our Config dict macro with good value
310 #from the config parameters
311 self
.sched
.conf
.fill_resource_macros_names_macros()
312 print "DBG: got macors", self
.sched
.conf
.macros
314 #Creating the Macroresolver Class & unique instance
319 #self.conf.quick_debug()
321 #Now create the external commander
322 #it's a applyer : it role is not to dispatch commands,
324 e
= ExternalCommandManager(self
.conf
, 'applyer')
326 #Scheduler need to know about external command to
327 #activate it if necessery
328 self
.sched
.load_external_command(e
)
330 #External command need the sched because he can raise checks
331 e
.load_scheduler(self
.sched
)
334 # our main function, launch after the init
337 self
.load_config_file()
339 self
.do_daemon_init_and_start()
340 self
.uri2
= self
.pyro_daemon
.register(self
.interface
, "ForArbiter")
341 print "The Arbiter Interface is at:", self
.uri2