Add : child_dependencies/ parent_dependencies in livestatus module.
[shinken.git] / shinken / satellite.py
blob188c51c77f88772b647f7f6ae9a695ec3a433f51
1 #!/usr/bin/env python
2 #Copyright (C) 2009-2010 :
3 # Gabes Jean, naparuba@gmail.com
4 # Gerhard Lausser, Gerhard.Lausser@consol.de
6 #This file is part of Shinken.
8 #Shinken is free software: you can redistribute it and/or modify
9 #it under the terms of the GNU Affero General Public License as published by
10 #the Free Software Foundation, either version 3 of the License, or
11 #(at your option) any later version.
13 #Shinken is distributed in the hope that it will be useful,
14 #but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 #GNU Affero General Public License for more details.
18 #You should have received a copy of the GNU Affero General Public License
19 #along with Shinken. If not, see <http://www.gnu.org/licenses/>.
22 #This class is an interface for reactionner and poller
23 #The satallite listen configuration from Arbiter in a port
24 #the configuration gived by arbiter is schedulers where actionner will
25 #take actions.
26 #When already launch and have a conf, actionner still listen to arbiter
27 #(one a timeout)
28 #if arbiter whant it to have a new conf, satellite forgot old schedulers
29 #(and actions into)
30 #take new ones and do the (new) job.
32 from Queue import Empty
33 from multiprocessing import Queue, Manager, active_children
34 import os
35 import copy
36 import time
37 import sys
38 import select
39 import cPickle
40 import random
42 try:
43 import shinken.pyro_wrapper as pyro
44 except ImportError:
45 print "Shinken require the Python Pyro module. Please install it."
46 sys.exit(1)
48 Pyro = pyro.Pyro
51 from message import Message
52 from worker import Worker
53 from load import Load
54 from daemon import Daemon
55 from log import logger
56 from brok import Brok
57 from check import Check
58 from notification import Notification
59 from eventhandler import EventHandler
61 #Interface for Arbiter, our big MASTER
62 #It put us our conf
63 class IForArbiter(Pyro.core.ObjBase):
64 #We keep app link because we are just here for it
65 def __init__(self, app):
66 Pyro.core.ObjBase.__init__(self)
67 self.app = app
68 self.schedulers = app.schedulers
71 #function called by arbiter for giving us our conf
72 #conf must be a dict with:
73 #'schedulers' : schedulers dict (by id) with address and port
74 #TODO: catch case where Arbiter send somethign we already have
75 #(same id+add+port) -> just do nothing :)
76 def put_conf(self, conf):
77 self.app.have_conf = True
78 self.app.have_new_conf = True
79 #Gout our name from the globals
80 if 'poller_name' in conf['global']:
81 self.name = conf['global']['poller_name']
82 elif 'reactionner_name' in conf['global']:
83 self.name = conf['global']['reactionner_name']
84 else:
85 self.name = 'Unnamed satellite'
86 self.app.name = self.name
88 print "[%s] Sending us a configuration %s " % (self.name, conf)
89 #If we've got something in the schedulers, we do not want it anymore
90 for sched_id in conf['schedulers'] :
91 already_got = False
92 if sched_id in self.schedulers:
93 logger.log("[%s] We already got the conf %d (%s)" % (self.name, sched_id, conf['schedulers'][sched_id]['name']))
94 already_got = True
95 wait_homerun = self.schedulers[sched_id]['wait_homerun']
96 s = conf['schedulers'][sched_id]
97 self.schedulers[sched_id] = s
99 uri = pyro.create_uri(s['address'], s['port'], 'Checks')
101 self.schedulers[sched_id]['uri'] = uri
102 if already_got:
103 self.schedulers[sched_id]['wait_homerun'] = wait_homerun
104 else:
105 self.schedulers[sched_id]['wait_homerun'] = {}
106 self.schedulers[sched_id]['running_id'] = 0
107 self.schedulers[sched_id]['active'] = s['active']
109 #And then we connect to it :)
110 self.app.pynag_con_init(sched_id)
112 #Now the limit part
113 self.app.max_workers = conf['global']['max_workers']
114 self.app.min_workers = conf['global']['min_workers']
115 self.app.processes_by_worker = conf['global']['processes_by_worker']
116 self.app.polling_interval = conf['global']['polling_interval']
117 if 'poller_tags' in conf['global']:
118 self.app.poller_tags = conf['global']['poller_tags']
119 else: #for reactionner, poler_tag is [None]
120 self.app.poller_tags = []
121 if 'max_plugins_output_length' in conf['global']:
122 self.app.max_plugins_output_length = conf['global']['max_plugins_output_length']
123 else: #for reactionner, we don't really care about it
124 self.app.max_plugins_output_length = 8192
125 print "Max output lenght" , self.app.max_plugins_output_length
126 #Set our giving timezone from arbiter
127 use_timezone = conf['global']['use_timezone']
128 if use_timezone != 'NOTSET':
129 logger.log("[%s] Setting our timezone to %s" %(self.name, use_timezone))
130 os.environ['TZ'] = use_timezone
131 time.tzset()
133 logger.log("We have our schedulers : %s" % (str(self.schedulers)))
136 #Arbiter ask us to do not manage a scheduler_id anymore
137 #I do it and don't ask why
138 def remove_from_conf(self, sched_id):
139 try:
140 del self.schedulers[sched_id]
141 except KeyError:
142 pass
145 #Arbiter ask me which sched_id I manage, If it is not ok with it
146 #It will ask me to remove one or more sched_id
147 def what_i_managed(self):
148 return self.schedulers.keys()
151 #Use for arbiter to know if we are alive
152 def ping(self):
153 print "We ask us for a ping"
154 return True
157 #Use by arbiter to know if we have a conf or not
158 #can be usefull if we must do nothing but
159 #we are not because it can KILL US!
160 def have_conf(self):
161 return self.app.have_conf
164 #Call by arbiter if it thinks we are running but we must do not (like
165 #if I was a spare that take a conf but the master returns, I must die
166 #and wait a new conf)
167 #Us : No please...
168 #Arbiter : I don't care, hasta la vista baby!
169 #Us : ... <- Nothing! We are die! you don't follow
170 #anything or what?? Reading code is not a job for eyes only...
171 def wait_new_conf(self):
172 print "Arbiter want me to wait a new conf"
173 self.schedulers.clear()
174 self.app.have_conf = False
177 #Interface for Brokers
178 #They connect here and get all broks (data for brokers)
179 #datas must be ORDERED! (initial status BEFORE uodate...)
180 class IBroks(Pyro.core.ObjBase):
181 #we keep sched link
182 def __init__(self, app):
183 Pyro.core.ObjBase.__init__(self)
184 self.app = app
185 self.running_id = random.random()
188 #Broker need to void it's broks?
189 def get_running_id(self):
190 return self.running_id
193 #poller or reactionner ask us actions
194 def get_broks(self):
195 #print "We ask us broks"
196 res = self.app.get_broks()
197 return res
200 #Ping? Pong!
201 def ping(self):
202 return None
206 #Our main APP class
207 class Satellite(Daemon):
208 def __init__(self, config_file, is_daemon, do_replace, debug, debug_file):
209 self.print_header()
211 #From daemon to manage signal. Call self.manage_signal if
212 #exists, a dummy function otherwise
213 self.set_exit_handler()
215 #Log init
216 self.log = logger
217 self.log.load_obj(self)
219 #The config reading part
220 self.config_file = config_file
221 #Read teh config file if exist
222 #if not, default properties are used
223 self.parse_config_file()
225 if self.config_file != None:
226 #Some paths can be relatives. We must have a full path by taking
227 #the config file by reference
228 self.relative_paths_to_full(os.path.dirname(config_file))
230 #Check if another Scheduler is not running (with the same conf)
231 self.check_parallel_run(do_replace)
233 #If the admin don't care about security, I allow root running
234 insane = not self.idontcareaboutsecurity
236 #Keep broks so they can be eaten by a broker
237 self.broks = {}
239 #Try to change the user (not nt for the moment)
240 #TODO: change user on nt
241 if os.name != 'nt':
242 self.change_user(insane)
243 else:
244 logger.log("Sorry, you can't change user on this system")
247 #Now the daemon part if need
248 if is_daemon:
249 self.create_daemon(do_debug=debug, debug_file=debug_file)
251 #Now the specific stuff
252 #Bool to know if we have received conf from arbiter
253 self.have_conf = False
254 self.have_new_conf = False
255 self.s = Queue() #Global Master -> Slave
256 #self.m = Queue() #Slave -> Master
257 self.manager = Manager()
258 self.returns_queue = self.manager.list()
260 #Ours schedulers
261 self.schedulers = {}
262 self.workers = {} #dict of active workers
264 self.nb_actions_in_workers = 0
266 #Init stats like Load for workers
267 self.wait_ratio = Load(initial_value=1)
270 self.t_each_loop = time.time() #use to track system time change
273 #initialise or re-initialise connexion with scheduler
274 def pynag_con_init(self, id):
275 sched = self.schedulers[id]
276 #If sched is not active, I do not try to init
277 #it is just useless
278 if not sched['active']:
279 return
281 logger.log("[%s] Init de connexion with %s at %s" % (self.name, sched['name'], sched['uri']))
282 running_id = sched['running_id']
283 sched['con'] = Pyro.core.getProxyForURI(sched['uri'])
285 #timeout of 120 s
286 #and get the running id
287 try:
288 pyro.set_timeout(sched['con'], 5)
289 new_run_id = sched['con'].get_running_id()
290 except (Pyro.errors.ProtocolError,Pyro.errors.NamingError, cPickle.PicklingError, KeyError, Pyro.errors.CommunicationError) , exp:
291 logger.log("[%s] Scheduler %s is not initilised or got network problem: %s" % (self.name, sched['name'], str(exp)))
292 sched['con'] = None
293 return
295 #The schedulers have been restart : it has a new run_id.
296 #So we clear all verifs, they are obsolete now.
297 if sched['running_id'] != 0 and new_run_id != running_id:
298 logger.log("[%s] The running id of the scheduler %s changed, we must clear it's actions" % (self.name, sched['name']))
299 sched['wait_homerun'].clear()
300 sched['running_id'] = new_run_id
301 logger.log("[%s] Connexion OK with scheduler %s" % (self.name, sched['name']))
304 #Manage action return from Workers
305 #We just put them into the sched they are for
306 #and we clean unused properties like sched_id
307 def manage_action_return(self, action):
308 #Ok, it's a result. We get it, and fill verifs of the good sched_id
309 sched_id = action.sched_id
310 #Now we now where to put action, we do not need sched_id anymore
311 del action.sched_id
312 action.set_status('waitforhomerun')
313 self.schedulers[sched_id]['wait_homerun'][action.get_id()] = action
314 #We update stats
315 self.nb_actions_in_workers =- 1
318 #Return the chk to scheduler and clean them
319 #REF: doc/shinken-action-queues.png (6)
320 def manage_returns(self):
321 total_sent = 0
322 #Fot all schedulers, we check for waitforhomerun and we send back results
323 for sched_id in self.schedulers:
324 sched = self.schedulers[sched_id]
325 #If sched is not active, I do not try return
326 if not sched['active']:
327 continue
328 #Now ret have all verifs, we can return them
329 send_ok = False
330 ret = sched['wait_homerun'].values()
331 if ret is not []:
332 try:
333 con = sched['con']
334 if con is not None: #None = not initialized
335 send_ok = con.put_results(ret)
336 #Not connected or sched is gone
337 except (Pyro.errors.ProtocolError, KeyError) , exp:
338 print exp
339 self.pynag_con_init(sched_id)
340 return
341 except AttributeError , exp: #the scheduler must not be initialized
342 print exp
343 except Exception , exp:
344 print ''.join(Pyro.util.getPyroTraceback(exp))
345 sys.exit(0)
347 #We clean ONLY if the send is OK
348 if send_ok :
349 sched['wait_homerun'].clear()
350 else:
351 self.pynag_con_init(sched_id)
352 logger.log("Sent failed!")
356 #Use to wait conf from arbiter.
357 #It send us conf in our daemon. It put the have_conf prop
358 #if he send us something
359 #(it can just do a ping)
360 def wait_for_initial_conf(self):
361 logger.log("Waiting for initial configuration")
362 timeout = 1.0
363 #Arbiter do not already set our have_conf param
364 while not self.have_conf :
365 socks = pyro.get_sockets(self.daemon)
367 before = time.time()
368 ins,outs,exs = select.select(socks,[],[],timeout) # 'foreign' event loop
370 #Manage a possible time change (our avant will be change with the diff)
371 diff = self.check_for_system_time_change()
372 before += diff
374 if ins != []:
375 for sock in socks:
376 if sock in ins:
377 pyro.handleRequests(self.daemon, sock)
378 after = time.time()
379 diff = after-before
380 timeout = timeout - diff
381 break # no need to continue with the for loop
382 else: #Timeout
383 sys.stdout.write(".")
384 sys.stdout.flush()
385 timeout = 1.0
387 if timeout < 0:
388 timeout = 1.0
391 #The arbiter can resent us new conf in the daemon port.
392 #We do not want to loose time about it, so it's not a bloking
393 #wait, timeout = 0s
394 #If it send us a new conf, we reinit the connexions of all schedulers
395 def watch_for_new_conf(self, timeout_daemon):
396 socks = pyro.get_sockets(self.daemon)
398 ins,outs,exs = select.select(socks,[],[],timeout_daemon)
399 if ins != []:
400 for sock in socks:
401 if sock in ins:
402 pyro.handleRequests(self.daemon, sock)
404 #have_new_conf is set with put_conf
405 #so another handle will not make a con_init
406 if self.have_new_conf:
407 for sched_id in self.schedulers:
408 print "Got a new conf"
409 self.pynag_con_init(sched_id)
410 self.have_new_conf = False
413 #Check if our system time change. If so, change our
414 def check_for_system_time_change(self):
415 now = time.time()
416 difference = now - self.t_each_loop
417 #If we have more than 15 min time change, we need to compensate
420 if abs(difference) > 900:
421 self.compensate_system_time_change(difference)
423 #Now set the new value for the tick loop
424 self.t_each_loop = now
426 #return the diff if it need, of just 0
427 if abs(difference) > 900:
428 return difference
429 else:
430 return 0
433 #If we've got a system time change, we need to compensate it
434 #from now, we do not do anything in fact.
435 def compensate_system_time_change(self, difference):
436 logger.log('Warning: A system time change of %s has been detected. Compensating...' % difference)
437 #We only need to change some value
442 #Create and launch a new worker, and put it into self.workers
443 #It can be mortal or not
444 def create_and_launch_worker(self, mortal=True):
445 w = Worker(1, self.s, self.returns_queue, self.processes_by_worker, \
446 mortal=mortal,max_plugins_output_length = self.max_plugins_output_length )
447 self.workers[w.id] = w
448 logger.log("[%s] Allocating new Worker : %s" % (self.name, w.id))
449 self.workers[w.id].start()
452 #Manage signal function
453 #TODO : manage more than just quit
454 #Frame is just garbage
455 def manage_signal(self, sig, frame):
456 logger.log("\nExiting with signal %s" % sig)
457 logger.log('Stopping all workers')
458 for w in self.workers.values():
459 try:
460 w.terminate()
461 w.join(timeout=1)
462 #queue = w.return_queue
463 #self.return_messages.remove(queue)
464 except AttributeError: #A already die worker
465 pass
466 except AssertionError: #In a worker
467 pass
468 logger.log('Stopping all network connexions')
469 self.daemon.disconnect(self.interface)
470 self.daemon.disconnect(self.brok_interface)
471 self.daemon.shutdown(True)
472 logger.log("Unlinking pid file")
473 try:
474 os.unlink(self.pidfile)
475 except OSError, exp:
476 print "Error un deleting pid file:", exp
477 logger.log("Exiting")
478 sys.exit(0)
481 #A simple fucntion to add objects in self
482 #like broks in self.broks, etc
483 #TODO : better tag ID?
484 def add(self, elt):
485 if isinstance(elt, Brok):
486 #For brok, we TAG brok with our instance_id
487 elt.data['instance_id'] = 0
488 self.broks[elt.id] = elt
489 return
492 #Someone ask us our broks. We send them, and clean the queue
493 def get_broks(self):
494 res = copy.copy(self.broks)
495 self.broks.clear()
496 return res
499 #workers are processes, they can die in a numerous of ways
500 #like :
501 #*99.99% : bug in code, sorry :p
502 #*0.005 % : a mix between a stupid admin (or an admin without coffee),
503 #and a kill command
504 #*0.005% : alien attack
505 #So they need to be detected, and restart if need
506 def check_and_del_zombie_workers(self):
507 #Active children make a join with every one, useful :)
508 act = active_children()
510 w_to_del = []
511 for w in self.workers.values():
512 #If a worker go down and we do not ask him, it's not
513 #good : we can think having a worker and it's not True
514 #So we del it
515 if not w.is_alive():
516 logger.log("[%s] Warning : the worker %s goes down unexpectly!" % (self.name, w.id))
517 #AIM ... Press FIRE ... <B>HEAD SHOT!</B>
518 w.terminate()
519 w.join(timeout=1)
520 w_to_del.append(w.id)
521 #OK, now really del workers
522 for id in w_to_del:
523 del self.workers[id]
526 #Here we create new workers if the queue load (len of verifs) is too long
527 def adjust_worker_number_by_load(self):
528 #TODO : get a real value for a load
529 wish_worker = 1
530 #I want at least min_workers or wish_workers (the biggest) but not more than max_workers
531 while len(self.workers) < self.min_workers \
532 or (wish_worker > len(self.workers) and len(self.workers) < self.max_workers):
533 self.create_and_launch_worker()
534 #TODO : if len(workers) > 2*wish, maybe we can kill a worker?
537 #We get new actions from schedulers, we create a Message ant we
538 #put it in the s queue (from master to slave)
539 #REF: doc/shinken-action-queues.png (1)
540 def get_new_actions(self):
541 #Here are the differences between a
542 #poller and a reactionner:
543 #Poller will only do checks,
544 #reactionner do actions
545 do_checks = self.__class__.do_checks
546 do_actions = self.__class__.do_actions
548 #We check for new check in each schedulers and put the result in new_checks
549 for sched_id in self.schedulers:
550 sched = self.schedulers[sched_id]
551 #If sched is not active, I do not try return
552 if not sched['active']:
553 continue
555 try:
556 con = sched['con']
557 if con is not None: #None = not initilized
558 pyro.set_timeout(con, 120)
559 #OK, go for it :)
560 tmp = con.get_checks(do_checks=do_checks, do_actions=do_actions, poller_tags=self.poller_tags)
561 print "Ask actions to", sched_id, "got", len(tmp)
562 #We 'tag' them with sched_id and put into queue for workers
563 #REF: doc/shinken-action-queues.png (2)
564 for a in tmp:
565 a.sched_id = sched_id
566 a.set_status('queue')
567 msg = Message(id=0, type='Do', data=a)
568 self.s.put(msg)
569 #Update stats
570 self.nb_actions_in_workers += 1
571 else: #no con? make the connexion
572 self.pynag_con_init(sched_id)
573 #Ok, con is not know, so we create it
574 #Or maybe is the connexion lsot, we recreate it
575 except (KeyError, Pyro.errors.ProtocolError) , exp:
576 print exp
577 self.pynag_con_init(sched_id)
578 #scheduler must not be initialized
579 #or scheduler must not have checks
580 except (AttributeError, Pyro.errors.NamingError) , exp:
581 print exp
582 #What the F**k? We do not know what happenned,
583 #so.. bye bye :)
584 except Pyro.errors.ConnectionClosedError , exp:
585 print exp
586 self.pynag_con_init(sched_id)
587 except Exception , exp:
588 print ''.join(Pyro.util.getPyroTraceback(exp))
589 sys.exit(0)
593 #Main function, will loop forever
594 def main(self):
595 Pyro.config.PYRO_COMPRESSION = 1
596 Pyro.config.PYRO_MULTITHREADED = 0
597 Pyro.config.PYRO_STORAGE = self.workdir
598 logger.log("Using working directory : %s" % os.path.abspath(self.workdir))
599 logger.log("Opening port: %s" % self.port)
600 #Daemon init
601 self.daemon = pyro.init_daemon(self.host, self.port)
603 #Now we create the interfaces
604 self.interface = IForArbiter(self)
605 self.brok_interface = IBroks(self)
607 #And we register them
608 self.uri2 = pyro.register(self.daemon, self.interface, "ForArbiter")
609 self.uri3 = pyro.register(self.daemon, self.brok_interface, "Broks")
611 #We wait for initial conf
612 self.wait_for_initial_conf()
614 #Connexion init with PyNag server
615 for sched_id in self.schedulers:
616 print "Init main"
617 self.pynag_con_init(sched_id)
618 self.have_new_conf = False
620 #Allocate Mortal Threads
621 for i in xrange(1, self.min_workers):
622 self.create_and_launch_worker() #create mortal worker
624 #Now main loop
625 timeout = self.polling_interval #default 1.0
626 while True:
627 begin_loop = time.time()
629 #Maybe the arbiter ask us to wait for a new conf
630 #If true, we must restart all...
631 if self.have_conf == False:
632 print "Begin wait initial"
633 self.wait_for_initial_conf()
634 print "End wait initial"
635 #for sched_id in self.schedulers:
636 # print "Init main2"
637 # self.pynag_con_init(sched_id)
639 #Now we check if arbiter speek to us in the daemon.
640 #If so, we listen for it
641 #When it push us conf, we reinit connexions
642 #Sleep in waiting a new conf :)
643 self.watch_for_new_conf(timeout)
645 #Manage a possible time change (our before will be change with the diff)
646 diff = self.check_for_system_time_change()
647 begin_loop += diff
649 try:
650 after = time.time()
651 timeout -= after-begin_loop
653 if timeout < 0: #for go in timeout
654 #print "Time out", timeout
655 raise Empty
657 except Empty , exp: #Time out Part
658 print " ======================== "
659 after = time.time()
660 timeout = self.polling_interval
662 #Check if zombies workers are among us :)
663 #If so : KILL THEM ALL!!!
664 self.check_and_del_zombie_workers()
666 #Print stats for debug
667 for sched_id in self.schedulers:
668 sched = self.schedulers[sched_id]
669 #In workers we've got actions send to queue - queue size
670 print '[%d][%s]Stats : Workers:%d (Queued:%d Processing:%d ReturnWait:%d)' % \
671 (sched_id, sched['name'],len(self.workers), self.s.qsize(), \
672 self.nb_actions_in_workers - self.s.qsize(), len(self.returns_queue))
675 #Before return or get new actions, see how we manage
676 #old ones : are they still in queue (s)? If True, we
677 #must wait more or at least have more workers
678 wait_ratio = self.wait_ratio.get_load()
679 if self.s.qsize() != 0 and wait_ratio < 5*self.polling_interval:
680 print "I decide to up wait ratio"
681 self.wait_ratio.update_load(wait_ratio * 2)
682 else:
683 #Go to self.polling_interval on normal run, if wait_ratio
684 #was >5*self.polling_interval,
685 #it make it come near 5 because if < 5, go up :)
686 self.wait_ratio.update_load(self.polling_interval)
687 wait_ratio = self.wait_ratio.get_load()
688 print "Wait ratio:", wait_ratio
690 #We can wait more than 1s if need,
691 #no more than 5s, but no less than 1
692 timeout = timeout * wait_ratio
693 timeout = max(self.polling_interval, timeout)
694 timeout = min(5*self.polling_interval, timeout)
696 #Maybe we do not have enouth workers, we check for it
697 #and launch new ones if need
698 self.adjust_worker_number_by_load()
700 #Manage all messages we've got in the last timeout
701 #for queue in self.return_messages:
702 while(len(self.returns_queue) != 0):
703 self.manage_action_return(self.returns_queue.pop())
705 #Now we can get new actions from schedulers
706 self.get_new_actions()
708 #We send all finished checks
709 #REF: doc/shinken-action-queues.png (6)
710 self.manage_returns()