*Change the jenkins/hudson test scripts
[shinken.git] / shinken / satellite.py
blob7bba5ac30956a3ce687b50975ecc2af7c5604f6c
1 #!/usr/bin/env python
2 #Copyright (C) 2009-2010 :
3 # Gabes Jean, naparuba@gmail.com
4 # Gerhard Lausser, Gerhard.Lausser@consol.de
5 # Gregory Starck, g.starck@gmail.com
7 #This file is part of Shinken.
9 #Shinken is free software: you can redistribute it and/or modify
10 #it under the terms of the GNU Affero General Public License as published by
11 #the Free Software Foundation, either version 3 of the License, or
12 #(at your option) any later version.
14 #Shinken is distributed in the hope that it will be useful,
15 #but WITHOUT ANY WARRANTY; without even the implied warranty of
16 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 #GNU Affero General Public License for more details.
19 #You should have received a copy of the GNU Affero General Public License
20 #along with Shinken. If not, see <http://www.gnu.org/licenses/>.
23 #This class is an interface for reactionner and poller
24 #The satallite listen configuration from Arbiter in a port
25 #the configuration gived by arbiter is schedulers where actionner will
26 #take actions.
27 #When already launch and have a conf, actionner still listen to arbiter
28 #(one a timeout)
29 #if arbiter whant it to have a new conf, satellite forgot old schedulers
30 #(and actions into)
31 #take new ones and do the (new) job.
33 from Queue import Empty
34 from multiprocessing import Queue, Manager, active_children
35 import os
36 import errno
37 import stat
38 import copy
39 import time
40 import sys
41 import select
42 import cPickle
43 import random
46 try:
47 import shinken.pyro_wrapper as pyro
48 except ImportError:
49 print "Shinken require the Python Pyro module. Please install it."
50 sys.exit(1)
52 Pyro = pyro.Pyro
55 from message import Message
56 from worker import Worker
57 from load import Load
58 from daemon import Daemon
59 from log import logger
60 from brok import Brok
61 from check import Check
62 from notification import Notification
63 from eventhandler import EventHandler
65 #Interface for Arbiter, our big MASTER
66 #It put us our conf
67 class IForArbiter(Pyro.core.ObjBase):
68 #We keep app link because we are just here for it
69 def __init__(self, app):
70 Pyro.core.ObjBase.__init__(self)
71 self.app = app
72 self.schedulers = app.schedulers
74 #function called by arbiter for giving us our conf
75 #conf must be a dict with:
76 #'schedulers' : schedulers dict (by id) with address and port
77 #TODO: catch case where Arbiter send somethign we already have
78 #(same id+add+port) -> just do nothing :)
79 def put_conf(self, conf):
80 self.app.have_conf = True
81 self.app.have_new_conf = True
82 #Gout our name from the globals
83 if 'poller_name' in conf['global']:
84 self.name = conf['global']['poller_name']
85 elif 'reactionner_name' in conf['global']:
86 self.name = conf['global']['reactionner_name']
87 else:
88 self.name = 'Unnamed satellite'
89 self.app.name = self.name
91 print "[%s] Sending us a configuration %s " % (self.name, conf)
92 #If we've got something in the schedulers, we do not want it anymore
93 for sched_id in conf['schedulers'] :
94 already_got = False
95 if sched_id in self.schedulers:
96 logger.log("[%s] We already got the conf %d (%s)" % (self.name, sched_id, conf['schedulers'][sched_id]['name']))
97 already_got = True
98 wait_homerun = self.schedulers[sched_id]['wait_homerun']
99 s = conf['schedulers'][sched_id]
100 self.schedulers[sched_id] = s
102 uri = pyro.create_uri(s['address'], s['port'], 'Checks', self.app.use_ssl)
103 print "DBG: scheduler UIR:", uri
105 self.schedulers[sched_id]['uri'] = uri
106 if already_got:
107 self.schedulers[sched_id]['wait_homerun'] = wait_homerun
108 else:
109 self.schedulers[sched_id]['wait_homerun'] = {}
110 self.schedulers[sched_id]['running_id'] = 0
111 self.schedulers[sched_id]['active'] = s['active']
113 #And then we connect to it :)
114 self.app.pynag_con_init(sched_id)
116 #Now the limit part
117 self.app.max_workers = conf['global']['max_workers']
118 self.app.min_workers = conf['global']['min_workers']
119 self.app.processes_by_worker = conf['global']['processes_by_worker']
120 self.app.polling_interval = conf['global']['polling_interval']
121 if 'poller_tags' in conf['global']:
122 self.app.poller_tags = conf['global']['poller_tags']
123 else: #for reactionner, poler_tag is [None]
124 self.app.poller_tags = []
125 if 'max_plugins_output_length' in conf['global']:
126 self.app.max_plugins_output_length = conf['global']['max_plugins_output_length']
127 else: #for reactionner, we don't really care about it
128 self.app.max_plugins_output_length = 8192
129 print "Max output lenght" , self.app.max_plugins_output_length
130 #Set our giving timezone from arbiter
131 use_timezone = conf['global']['use_timezone']
132 if use_timezone != 'NOTSET':
133 logger.log("[%s] Setting our timezone to %s" %(self.name, use_timezone))
134 os.environ['TZ'] = use_timezone
135 time.tzset()
137 logger.log("We have our schedulers : %s" % (str(self.schedulers)))
140 #Arbiter ask us to do not manage a scheduler_id anymore
141 #I do it and don't ask why
142 def remove_from_conf(self, sched_id):
143 try:
144 del self.schedulers[sched_id]
145 except KeyError:
146 pass
149 #Arbiter ask me which sched_id I manage, If it is not ok with it
150 #It will ask me to remove one or more sched_id
151 def what_i_managed(self):
152 return self.schedulers.keys()
155 #Use for arbiter to know if we are alive
156 def ping(self):
157 print "We ask us for a ping"
158 return True
161 #Use by arbiter to know if we have a conf or not
162 #can be usefull if we must do nothing but
163 #we are not because it can KILL US!
164 def have_conf(self):
165 return self.app.have_conf
168 #Call by arbiter if it thinks we are running but we must do not (like
169 #if I was a spare that take a conf but the master returns, I must die
170 #and wait a new conf)
171 #Us : No please...
172 #Arbiter : I don't care, hasta la vista baby!
173 #Us : ... <- Nothing! We are die! you don't follow
174 #anything or what?? Reading code is not a job for eyes only...
175 def wait_new_conf(self):
176 print "Arbiter want me to wait a new conf"
177 self.schedulers.clear()
178 self.app.have_conf = False
181 #Interface for Brokers
182 #They connect here and get all broks (data for brokers)
183 #datas must be ORDERED! (initial status BEFORE uodate...)
184 class IBroks(Pyro.core.ObjBase):
185 #we keep sched link
186 def __init__(self, app):
187 Pyro.core.ObjBase.__init__(self)
188 self.app = app
189 self.running_id = random.random()
192 #Broker need to void it's broks?
193 def get_running_id(self):
194 return self.running_id
197 #poller or reactionner ask us actions
198 def get_broks(self):
199 #print "We ask us broks"
200 res = self.app.get_broks()
201 return res
204 #Ping? Pong!
205 def ping(self):
206 return None
210 #Our main APP class
211 class Satellite(Daemon):
212 def __init__(self, config_file, is_daemon, do_replace, debug, debug_file):
214 self.check_shm()
216 Daemon.__init__(self, config_file, is_daemon, do_replace, debug, debug_file)
218 # Keep broks so they can be eaten by a broker
219 self.broks = {}
221 # Ours schedulers
222 self.schedulers = {}
223 self.workers = {} # dict of active workers
225 self.nb_actions_in_workers = 0
227 # Init stats like Load for workers
228 self.wait_ratio = Load(initial_value=1)
230 self.t_each_loop = time.time() # used to track system time change
232 #Now the specific stuff
233 #Bool to know if we have received conf from arbiter
234 self.have_conf = False
235 self.have_new_conf = False
237 # Now we create the interfaces
238 self.interface = IForArbiter(self)
239 self.brok_interface = IBroks(self)
241 # Just for having these attributes defined here. explicit > implicit ;)
242 self.uri2 = None
243 self.uri3 = None
244 self.s = None
245 self.manager = None
246 self.returns_queue = None
249 def pynag_con_init(self, id):
250 """ Initialize or re-initialize connexion with scheduler """
251 sched = self.schedulers[id]
252 #If sched is not active, I do not try to init
253 #it is just useless
254 if not sched['active']:
255 return
257 logger.log("[%s] Init de connexion with %s at %s" % (self.name, sched['name'], sched['uri']))
258 running_id = sched['running_id']
259 sch_con = sched['con'] = Pyro.core.getProxyForURI(sched['uri'])
261 #timeout of 120 s
262 #and get the running id
263 try:
264 pyro.set_timeout(sch_con, 5)
265 new_run_id = sch_con.get_running_id()
266 except (Pyro.errors.ProtocolError,Pyro.errors.NamingError, cPickle.PicklingError, KeyError, Pyro.errors.CommunicationError) , exp:
267 logger.log("[%s] Scheduler %s is not initilised or got network problem: %s" % (self.name, sched['name'], str(exp)))
268 sched['con'] = None
269 return
271 #The schedulers have been restart : it has a new run_id.
272 #So we clear all verifs, they are obsolete now.
273 if sched['running_id'] != 0 and new_run_id != running_id:
274 logger.log("[%s] The running id of the scheduler %s changed, we must clear it's actions" % (self.name, sched['name']))
275 sched['wait_homerun'].clear()
276 sched['running_id'] = new_run_id
277 logger.log("[%s] Connexion OK with scheduler %s" % (self.name, sched['name']))
280 #Manage action return from Workers
281 #We just put them into the sched they are for
282 #and we clean unused properties like sched_id
283 def manage_action_return(self, action):
284 #Ok, it's a result. We get it, and fill verifs of the good sched_id
285 sched_id = action.sched_id
286 #Now we now where to put action, we do not need sched_id anymore
287 del action.sched_id
288 action.status = 'waitforhomerun'
289 self.schedulers[sched_id]['wait_homerun'][action.get_id()] = action
290 #We update stats
291 self.nb_actions_in_workers =- 1
294 #Return the chk to scheduler and clean them
295 #REF: doc/shinken-action-queues.png (6)
296 def manage_returns(self):
297 total_sent = 0
298 #Fot all schedulers, we check for waitforhomerun and we send back results
299 for sched_id in self.schedulers:
300 sched = self.schedulers[sched_id]
301 #If sched is not active, I do not try return
302 if not sched['active']:
303 continue
304 #Now ret have all verifs, we can return them
305 send_ok = False
306 ret = sched['wait_homerun'].values()
307 if ret is not []:
308 try:
309 con = sched['con']
310 if con is not None: #None = not initialized
311 send_ok = con.put_results(ret)
312 #Not connected or sched is gone
313 except (Pyro.errors.ProtocolError, KeyError) , exp:
314 print exp
315 self.pynag_con_init(sched_id)
316 return
317 except AttributeError , exp: #the scheduler must not be initialized
318 print exp
319 except Exception , exp:
320 print ''.join(Pyro.util.getPyroTraceback(exp))
321 sys.exit(0)
323 #We clean ONLY if the send is OK
324 if send_ok :
325 sched['wait_homerun'].clear()
326 else:
327 self.pynag_con_init(sched_id)
328 logger.log("Sent failed!")
332 #Use to wait conf from arbiter.
333 #It send us conf in our daemon. It put the have_conf prop
334 #if he send us something
335 #(it can just do a ping)
336 def wait_for_initial_conf(self):
337 logger.log("Waiting for initial configuration")
338 timeout = 1.0
339 #Arbiter do not already set our have_conf param
340 while not self.have_conf and not self.interrupted:
341 before = time.time()
343 socks = pyro.get_sockets(self.daemon)
344 ins = self.get_socks_activity(socks, timeout)
346 #Manage a possible time change (our avant will be change with the diff)
347 diff = self.check_for_system_time_change()
348 before += diff
350 if ins != []:
351 for sock in socks:
352 if sock in ins:
353 pyro.handleRequests(self.daemon, sock)
354 after = time.time()
355 diff = after-before
356 timeout = timeout - diff
357 break # no need to continue with the for loop
358 else: #Timeout
359 sys.stdout.write(".")
360 sys.stdout.flush()
361 timeout = 1.0
363 if timeout < 0:
364 timeout = 1.0
366 if self.interrupted:
367 self.request_stop()
370 #The arbiter can resent us new conf in the daemon port.
371 #We do not want to loose time about it, so it's not a bloking
372 #wait, timeout = 0s
373 #If it send us a new conf, we reinit the connexions of all schedulers
374 def watch_for_new_conf(self, timeout_daemon):
375 socks = pyro.get_sockets(self.daemon)
376 ins = self.get_socks_activity(socks, timeout_daemon)
377 if ins != []:
378 for sock in socks:
379 if sock in ins:
380 pyro.handleRequests(self.daemon, sock)
382 #have_new_conf is set with put_conf
383 #so another handle will not make a con_init
384 if self.have_new_conf:
385 for sched_id in self.schedulers:
386 print "Got a new conf"
387 self.pynag_con_init(sched_id)
388 self.have_new_conf = False
391 #Check if our system time change. If so, change our
392 def check_for_system_time_change(self):
393 now = time.time()
394 difference = now - self.t_each_loop
395 #If we have more than 15 min time change, we need to compensate
398 if abs(difference) > 900:
399 self.compensate_system_time_change(difference)
401 #Now set the new value for the tick loop
402 self.t_each_loop = now
404 #return the diff if it need, of just 0
405 if abs(difference) > 900:
406 return difference
407 else:
408 return 0
411 #If we've got a system time change, we need to compensate it
412 #from now, we do not do anything in fact.
413 def compensate_system_time_change(self, difference):
414 logger.log('Warning: A system time change of %s has been detected. Compensating...' % difference)
415 #We only need to change some value
420 #Create and launch a new worker, and put it into self.workers
421 #It can be mortal or not
422 def create_and_launch_worker(self, mortal=True):
423 w = Worker(1, self.s, self.returns_queue, self.processes_by_worker, \
424 mortal=mortal,max_plugins_output_length = self.max_plugins_output_length )
425 self.workers[w.id] = w
426 logger.log("[%s] Allocating new Worker : %s" % (self.name, w.id))
427 self.workers[w.id].start()
430 def do_stop(self):
431 logger.log('Stopping all workers')
432 for w in self.workers.values():
433 try:
434 w.terminate()
435 w.join(timeout=1)
436 #queue = w.return_queue
437 #self.return_messages.remove(queue)
438 except AttributeError: #A already die worker
439 pass
440 except AssertionError: #In a worker
441 pass
442 logger.log('Stopping all network connexions')
443 self.daemon.disconnect(self.interface)
444 self.daemon.disconnect(self.brok_interface)
445 self.daemon.shutdown(True)
448 #A simple fucntion to add objects in self
449 #like broks in self.broks, etc
450 #TODO : better tag ID?
451 def add(self, elt):
452 if isinstance(elt, Brok):
453 #For brok, we TAG brok with our instance_id
454 elt.data['instance_id'] = 0
455 self.broks[elt.id] = elt
456 return
459 #Someone ask us our broks. We send them, and clean the queue
460 def get_broks(self):
461 res = copy.copy(self.broks)
462 self.broks.clear()
463 return res
466 #workers are processes, they can die in a numerous of ways
467 #like :
468 #*99.99% : bug in code, sorry :p
469 #*0.005 % : a mix between a stupid admin (or an admin without coffee),
470 #and a kill command
471 #*0.005% : alien attack
472 #So they need to be detected, and restart if need
473 def check_and_del_zombie_workers(self):
474 #Active children make a join with every one, useful :)
475 act = active_children()
477 w_to_del = []
478 for w in self.workers.values():
479 #If a worker go down and we do not ask him, it's not
480 #good : we can think having a worker and it's not True
481 #So we del it
482 if not w.is_alive():
483 logger.log("[%s] Warning : the worker %s goes down unexpectly!" % (self.name, w.id))
484 #AIM ... Press FIRE ... <B>HEAD SHOT!</B>
485 w.terminate()
486 w.join(timeout=1)
487 w_to_del.append(w.id)
488 #OK, now really del workers
489 for id in w_to_del:
490 del self.workers[id]
493 #Here we create new workers if the queue load (len of verifs) is too long
494 def adjust_worker_number_by_load(self):
495 #TODO : get a real value for a load
496 wish_worker = 1
497 #I want at least min_workers or wish_workers (the biggest) but not more than max_workers
498 while len(self.workers) < self.min_workers \
499 or (wish_worker > len(self.workers) and len(self.workers) < self.max_workers):
500 self.create_and_launch_worker()
501 #TODO : if len(workers) > 2*wish, maybe we can kill a worker?
504 #We get new actions from schedulers, we create a Message ant we
505 #put it in the s queue (from master to slave)
506 #REF: doc/shinken-action-queues.png (1)
507 def get_new_actions(self):
508 #Here are the differences between a
509 #poller and a reactionner:
510 #Poller will only do checks,
511 #reactionner do actions
512 do_checks = self.__class__.do_checks
513 do_actions = self.__class__.do_actions
515 #We check for new check in each schedulers and put the result in new_checks
516 for sched_id in self.schedulers:
517 sched = self.schedulers[sched_id]
518 #If sched is not active, I do not try return
519 if not sched['active']:
520 continue
522 try:
523 con = sched['con']
524 if con is not None: #None = not initilized
525 pyro.set_timeout(con, 120)
526 #OK, go for it :)
527 tmp = con.get_checks(do_checks=do_checks, do_actions=do_actions, poller_tags=self.poller_tags)
528 print "Ask actions to", sched_id, "got", len(tmp)
529 #We 'tag' them with sched_id and put into queue for workers
530 #REF: doc/shinken-action-queues.png (2)
531 for a in tmp:
532 a.sched_id = sched_id
533 a.status = 'queue'
534 msg = Message(id=0, type='Do', data=a)
535 self.s.put(msg)
536 #Update stats
537 self.nb_actions_in_workers += 1
538 else: #no con? make the connexion
539 self.pynag_con_init(sched_id)
540 #Ok, con is not know, so we create it
541 #Or maybe is the connexion lsot, we recreate it
542 except (KeyError, Pyro.errors.ProtocolError) , exp:
543 print exp
544 self.pynag_con_init(sched_id)
545 #scheduler must not be initialized
546 #or scheduler must not have checks
547 except (AttributeError, Pyro.errors.NamingError) , exp:
548 print exp
549 #What the F**k? We do not know what happenned,
550 #so.. bye bye :)
551 except Pyro.errors.ConnectionClosedError , exp:
552 print exp
553 self.pynag_con_init(sched_id)
554 except Exception , exp:
555 print ''.join(Pyro.util.getPyroTraceback(exp))
556 sys.exit(0)
559 def do_loop_turn(self):
560 begin_loop = time.time()
562 #Maybe the arbiter ask us to wait for a new conf
563 #If true, we must restart all...
564 if self.have_conf == False:
565 print "Begin wait initial"
566 self.wait_for_initial_conf()
567 print "End wait initial"
568 #for sched_id in self.schedulers:
569 # print "Init main2"
570 # self.pynag_con_init(sched_id)
572 #Now we check if arbiter speek to us in the daemon.
573 #If so, we listen for it
574 #When it push us conf, we reinit connexions
575 #Sleep in waiting a new conf :)
576 self.watch_for_new_conf(self.timeout)
578 #Manage a possible time change (our before will be change with the diff)
579 diff = self.check_for_system_time_change()
580 begin_loop += diff
582 after = time.time()
583 self.timeout -= after-begin_loop
585 if self.timeout >= 0:
586 return
588 print " ======================== "
589 after = time.time()
590 self.timeout = self.polling_interval
592 #Check if zombies workers are among us :)
593 #If so : KILL THEM ALL!!!
594 self.check_and_del_zombie_workers()
596 #Print stats for debug
597 for sched_id in self.schedulers:
598 sched = self.schedulers[sched_id]
599 #In workers we've got actions send to queue - queue size
600 print '[%d][%s]Stats : Workers:%d (Queued:%d Processing:%d ReturnWait:%d)' % \
601 (sched_id, sched['name'],len(self.workers), self.s.qsize(), \
602 self.nb_actions_in_workers - self.s.qsize(), len(self.returns_queue))
605 #Before return or get new actions, see how we manage
606 #old ones : are they still in queue (s)? If True, we
607 #must wait more or at least have more workers
608 wait_ratio = self.wait_ratio.get_load()
609 if self.s.qsize() != 0 and wait_ratio < 5*self.polling_interval:
610 print "I decide to up wait ratio"
611 self.wait_ratio.update_load(wait_ratio * 2)
612 else:
613 #Go to self.polling_interval on normal run, if wait_ratio
614 #was >5*self.polling_interval,
615 #it make it come near 5 because if < 5, go up :)
616 self.wait_ratio.update_load(self.polling_interval)
617 wait_ratio = self.wait_ratio.get_load()
618 print "Wait ratio:", wait_ratio
620 # We can wait more than 1s if need,
621 # no more than 5s, but no less than 1
622 timeout = self.timeout * wait_ratio
623 timeout = max(self.polling_interval, timeout)
624 self.timeout = min(5*self.polling_interval, timeout)
626 # Maybe we do not have enouth workers, we check for it
627 # and launch new ones if need
628 self.adjust_worker_number_by_load()
630 # Manage all messages we've got in the last timeout
631 # for queue in self.return_messages:
632 while len(self.returns_queue) != 0:
633 self.manage_action_return(self.returns_queue.pop())
635 # Now we can get new actions from schedulers
636 self.get_new_actions()
638 # We send all finished checks
639 # REF: doc/shinken-action-queues.png (6)
640 self.manage_returns()
643 def do_post_daemon_init(self):
645 # And we register them
646 self.uri2 = pyro.register(self.daemon, self.interface, "ForArbiter")
647 self.uri3 = pyro.register(self.daemon, self.brok_interface, "Broks")
649 self.s = Queue() #Global Master -> Slave
650 self.manager = Manager()
651 self.returns_queue = self.manager.list()
654 def main(self):
656 self.do_load_config()
658 self.do_daemon_init_and_start()
660 self.do_post_daemon_init()
662 # We wait for initial conf
663 self.wait_for_initial_conf()
665 # Connexion init with PyNag server
666 for sched_id in self.schedulers:
667 print "Init main"
668 self.pynag_con_init(sched_id)
669 self.have_new_conf = False
671 # Allocate Mortal Threads
672 for _ in xrange(1, self.min_workers):
673 self.create_and_launch_worker()
675 # Now main loop
676 self.timeout = self.polling_interval
679 self.do_mainloop()