2 #Copyright (C) 2009-2010 :
3 # Gabes Jean, naparuba@gmail.com
4 # Gerhard Lausser, Gerhard.Lausser@consol.de
5 # Gregory Starck, g.starck@gmail.com
7 #This file is part of Shinken.
9 #Shinken is free software: you can redistribute it and/or modify
10 #it under the terms of the GNU Affero General Public License as published by
11 #the Free Software Foundation, either version 3 of the License, or
12 #(at your option) any later version.
14 #Shinken is distributed in the hope that it will be useful,
15 #but WITHOUT ANY WARRANTY; without even the implied warranty of
16 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 #GNU Affero General Public License for more details.
19 #You should have received a copy of the GNU Affero General Public License
20 #along with Shinken. If not, see <http://www.gnu.org/licenses/>.
23 #This class is an interface for reactionner and poller
24 #The satallite listen configuration from Arbiter in a port
25 #the configuration gived by arbiter is schedulers where actionner will
27 #When already launch and have a conf, actionner still listen to arbiter
29 #if arbiter whant it to have a new conf, satellite forgot old schedulers
31 #take new ones and do the (new) job.
33 from Queue
import Empty
34 from multiprocessing
import Queue
, Manager
, active_children
47 import shinken
.pyro_wrapper
as pyro
49 print "Shinken require the Python Pyro module. Please install it."
55 from message
import Message
56 from worker
import Worker
58 from daemon
import Daemon
59 from log
import logger
61 from check
import Check
62 from notification
import Notification
63 from eventhandler
import EventHandler
65 #Interface for Arbiter, our big MASTER
67 class IForArbiter(Pyro
.core
.ObjBase
):
68 #We keep app link because we are just here for it
69 def __init__(self
, app
):
70 Pyro
.core
.ObjBase
.__init
__(self
)
72 self
.schedulers
= app
.schedulers
74 #function called by arbiter for giving us our conf
75 #conf must be a dict with:
76 #'schedulers' : schedulers dict (by id) with address and port
77 #TODO: catch case where Arbiter send somethign we already have
78 #(same id+add+port) -> just do nothing :)
79 def put_conf(self
, conf
):
80 self
.app
.have_conf
= True
81 self
.app
.have_new_conf
= True
82 #Gout our name from the globals
83 if 'poller_name' in conf
['global']:
84 self
.name
= conf
['global']['poller_name']
85 elif 'reactionner_name' in conf
['global']:
86 self
.name
= conf
['global']['reactionner_name']
88 self
.name
= 'Unnamed satellite'
89 self
.app
.name
= self
.name
91 print "[%s] Sending us a configuration %s " % (self
.name
, conf
)
92 #If we've got something in the schedulers, we do not want it anymore
93 for sched_id
in conf
['schedulers'] :
95 if sched_id
in self
.schedulers
:
96 logger
.log("[%s] We already got the conf %d (%s)" % (self
.name
, sched_id
, conf
['schedulers'][sched_id
]['name']))
98 wait_homerun
= self
.schedulers
[sched_id
]['wait_homerun']
99 s
= conf
['schedulers'][sched_id
]
100 self
.schedulers
[sched_id
] = s
102 uri
= pyro
.create_uri(s
['address'], s
['port'], 'Checks', self
.app
.use_ssl
)
103 print "DBG: scheduler UIR:", uri
105 self
.schedulers
[sched_id
]['uri'] = uri
107 self
.schedulers
[sched_id
]['wait_homerun'] = wait_homerun
109 self
.schedulers
[sched_id
]['wait_homerun'] = {}
110 self
.schedulers
[sched_id
]['running_id'] = 0
111 self
.schedulers
[sched_id
]['active'] = s
['active']
113 #And then we connect to it :)
114 self
.app
.pynag_con_init(sched_id
)
117 self
.app
.max_workers
= conf
['global']['max_workers']
118 self
.app
.min_workers
= conf
['global']['min_workers']
119 self
.app
.processes_by_worker
= conf
['global']['processes_by_worker']
120 self
.app
.polling_interval
= conf
['global']['polling_interval']
121 if 'poller_tags' in conf
['global']:
122 self
.app
.poller_tags
= conf
['global']['poller_tags']
123 else: #for reactionner, poler_tag is [None]
124 self
.app
.poller_tags
= []
125 if 'max_plugins_output_length' in conf
['global']:
126 self
.app
.max_plugins_output_length
= conf
['global']['max_plugins_output_length']
127 else: #for reactionner, we don't really care about it
128 self
.app
.max_plugins_output_length
= 8192
129 print "Max output lenght" , self
.app
.max_plugins_output_length
130 #Set our giving timezone from arbiter
131 use_timezone
= conf
['global']['use_timezone']
132 if use_timezone
!= 'NOTSET':
133 logger
.log("[%s] Setting our timezone to %s" %(self
.name
, use_timezone
))
134 os
.environ
['TZ'] = use_timezone
137 logger
.log("We have our schedulers : %s" % (str(self
.schedulers
)))
140 #Arbiter ask us to do not manage a scheduler_id anymore
141 #I do it and don't ask why
142 def remove_from_conf(self
, sched_id
):
144 del self
.schedulers
[sched_id
]
149 #Arbiter ask me which sched_id I manage, If it is not ok with it
150 #It will ask me to remove one or more sched_id
151 def what_i_managed(self
):
152 return self
.schedulers
.keys()
155 #Use for arbiter to know if we are alive
157 print "We ask us for a ping"
161 #Use by arbiter to know if we have a conf or not
162 #can be usefull if we must do nothing but
163 #we are not because it can KILL US!
165 return self
.app
.have_conf
168 #Call by arbiter if it thinks we are running but we must do not (like
169 #if I was a spare that take a conf but the master returns, I must die
170 #and wait a new conf)
172 #Arbiter : I don't care, hasta la vista baby!
173 #Us : ... <- Nothing! We are die! you don't follow
174 #anything or what?? Reading code is not a job for eyes only...
175 def wait_new_conf(self
):
176 print "Arbiter want me to wait a new conf"
177 self
.schedulers
.clear()
178 self
.app
.have_conf
= False
181 #Interface for Brokers
182 #They connect here and get all broks (data for brokers)
183 #datas must be ORDERED! (initial status BEFORE uodate...)
184 class IBroks(Pyro
.core
.ObjBase
):
186 def __init__(self
, app
):
187 Pyro
.core
.ObjBase
.__init
__(self
)
189 self
.running_id
= random
.random()
192 #Broker need to void it's broks?
193 def get_running_id(self
):
194 return self
.running_id
197 #poller or reactionner ask us actions
199 #print "We ask us broks"
200 res
= self
.app
.get_broks()
211 class Satellite(Daemon
):
212 def __init__(self
, config_file
, is_daemon
, do_replace
, debug
, debug_file
):
216 Daemon
.__init
__(self
, config_file
, is_daemon
, do_replace
, debug
, debug_file
)
218 # Keep broks so they can be eaten by a broker
223 self
.workers
= {} # dict of active workers
225 self
.nb_actions_in_workers
= 0
227 # Init stats like Load for workers
228 self
.wait_ratio
= Load(initial_value
=1)
230 self
.t_each_loop
= time
.time() # used to track system time change
232 #Now the specific stuff
233 #Bool to know if we have received conf from arbiter
234 self
.have_conf
= False
235 self
.have_new_conf
= False
237 # Now we create the interfaces
238 self
.interface
= IForArbiter(self
)
239 self
.brok_interface
= IBroks(self
)
241 # Just for having these attributes defined here. explicit > implicit ;)
246 self
.returns_queue
= None
249 def pynag_con_init(self
, id):
250 """ Initialize or re-initialize connexion with scheduler """
251 sched
= self
.schedulers
[id]
252 #If sched is not active, I do not try to init
254 if not sched
['active']:
257 logger
.log("[%s] Init de connexion with %s at %s" % (self
.name
, sched
['name'], sched
['uri']))
258 running_id
= sched
['running_id']
259 sch_con
= sched
['con'] = Pyro
.core
.getProxyForURI(sched
['uri'])
262 #and get the running id
264 pyro
.set_timeout(sch_con
, 5)
265 new_run_id
= sch_con
.get_running_id()
266 except (Pyro
.errors
.ProtocolError
,Pyro
.errors
.NamingError
, cPickle
.PicklingError
, KeyError, Pyro
.errors
.CommunicationError
) , exp
:
267 logger
.log("[%s] Scheduler %s is not initilised or got network problem: %s" % (self
.name
, sched
['name'], str(exp
)))
271 #The schedulers have been restart : it has a new run_id.
272 #So we clear all verifs, they are obsolete now.
273 if sched
['running_id'] != 0 and new_run_id
!= running_id
:
274 logger
.log("[%s] The running id of the scheduler %s changed, we must clear it's actions" % (self
.name
, sched
['name']))
275 sched
['wait_homerun'].clear()
276 sched
['running_id'] = new_run_id
277 logger
.log("[%s] Connexion OK with scheduler %s" % (self
.name
, sched
['name']))
280 #Manage action return from Workers
281 #We just put them into the sched they are for
282 #and we clean unused properties like sched_id
283 def manage_action_return(self
, action
):
284 #Ok, it's a result. We get it, and fill verifs of the good sched_id
285 sched_id
= action
.sched_id
286 #Now we now where to put action, we do not need sched_id anymore
288 action
.status
= 'waitforhomerun'
289 self
.schedulers
[sched_id
]['wait_homerun'][action
.get_id()] = action
291 self
.nb_actions_in_workers
=- 1
294 #Return the chk to scheduler and clean them
295 #REF: doc/shinken-action-queues.png (6)
296 def manage_returns(self
):
298 #Fot all schedulers, we check for waitforhomerun and we send back results
299 for sched_id
in self
.schedulers
:
300 sched
= self
.schedulers
[sched_id
]
301 #If sched is not active, I do not try return
302 if not sched
['active']:
304 #Now ret have all verifs, we can return them
306 ret
= sched
['wait_homerun'].values()
310 if con
is not None: #None = not initialized
311 send_ok
= con
.put_results(ret
)
312 #Not connected or sched is gone
313 except (Pyro
.errors
.ProtocolError
, KeyError) , exp
:
315 self
.pynag_con_init(sched_id
)
317 except AttributeError , exp
: #the scheduler must not be initialized
319 except Exception , exp
:
320 print ''.join(Pyro
.util
.getPyroTraceback(exp
))
323 #We clean ONLY if the send is OK
325 sched
['wait_homerun'].clear()
327 self
.pynag_con_init(sched_id
)
328 logger
.log("Sent failed!")
332 #Use to wait conf from arbiter.
333 #It send us conf in our daemon. It put the have_conf prop
334 #if he send us something
335 #(it can just do a ping)
336 def wait_for_initial_conf(self
):
337 logger
.log("Waiting for initial configuration")
339 #Arbiter do not already set our have_conf param
340 while not self
.have_conf
and not self
.interrupted
:
343 socks
= pyro
.get_sockets(self
.daemon
)
344 ins
= self
.get_socks_activity(socks
, timeout
)
346 #Manage a possible time change (our avant will be change with the diff)
347 diff
= self
.check_for_system_time_change()
353 pyro
.handleRequests(self
.daemon
, sock
)
356 timeout
= timeout
- diff
357 break # no need to continue with the for loop
359 sys
.stdout
.write(".")
370 #The arbiter can resent us new conf in the daemon port.
371 #We do not want to loose time about it, so it's not a bloking
373 #If it send us a new conf, we reinit the connexions of all schedulers
374 def watch_for_new_conf(self
, timeout_daemon
):
375 socks
= pyro
.get_sockets(self
.daemon
)
376 ins
= self
.get_socks_activity(socks
, timeout_daemon
)
380 pyro
.handleRequests(self
.daemon
, sock
)
382 #have_new_conf is set with put_conf
383 #so another handle will not make a con_init
384 if self
.have_new_conf
:
385 for sched_id
in self
.schedulers
:
386 print "Got a new conf"
387 self
.pynag_con_init(sched_id
)
388 self
.have_new_conf
= False
391 #Check if our system time change. If so, change our
392 def check_for_system_time_change(self
):
394 difference
= now
- self
.t_each_loop
395 #If we have more than 15 min time change, we need to compensate
398 if abs(difference
) > 900:
399 self
.compensate_system_time_change(difference
)
401 #Now set the new value for the tick loop
402 self
.t_each_loop
= now
404 #return the diff if it need, of just 0
405 if abs(difference
) > 900:
411 #If we've got a system time change, we need to compensate it
412 #from now, we do not do anything in fact.
413 def compensate_system_time_change(self
, difference
):
414 logger
.log('Warning: A system time change of %s has been detected. Compensating...' % difference
)
415 #We only need to change some value
420 #Create and launch a new worker, and put it into self.workers
421 #It can be mortal or not
422 def create_and_launch_worker(self
, mortal
=True):
423 w
= Worker(1, self
.s
, self
.returns_queue
, self
.processes_by_worker
, \
424 mortal
=mortal
,max_plugins_output_length
= self
.max_plugins_output_length
)
425 self
.workers
[w
.id] = w
426 logger
.log("[%s] Allocating new Worker : %s" % (self
.name
, w
.id))
427 self
.workers
[w
.id].start()
431 logger
.log('Stopping all workers')
432 for w
in self
.workers
.values():
436 #queue = w.return_queue
437 #self.return_messages.remove(queue)
438 except AttributeError: #A already die worker
440 except AssertionError: #In a worker
442 logger
.log('Stopping all network connexions')
443 self
.daemon
.disconnect(self
.interface
)
444 self
.daemon
.disconnect(self
.brok_interface
)
445 self
.daemon
.shutdown(True)
448 #A simple fucntion to add objects in self
449 #like broks in self.broks, etc
450 #TODO : better tag ID?
452 if isinstance(elt
, Brok
):
453 #For brok, we TAG brok with our instance_id
454 elt
.data
['instance_id'] = 0
455 self
.broks
[elt
.id] = elt
459 #Someone ask us our broks. We send them, and clean the queue
461 res
= copy
.copy(self
.broks
)
466 #workers are processes, they can die in a numerous of ways
468 #*99.99% : bug in code, sorry :p
469 #*0.005 % : a mix between a stupid admin (or an admin without coffee),
471 #*0.005% : alien attack
472 #So they need to be detected, and restart if need
473 def check_and_del_zombie_workers(self
):
474 #Active children make a join with every one, useful :)
475 act
= active_children()
478 for w
in self
.workers
.values():
479 #If a worker go down and we do not ask him, it's not
480 #good : we can think having a worker and it's not True
483 logger
.log("[%s] Warning : the worker %s goes down unexpectly!" % (self
.name
, w
.id))
484 #AIM ... Press FIRE ... <B>HEAD SHOT!</B>
487 w_to_del
.append(w
.id)
488 #OK, now really del workers
493 #Here we create new workers if the queue load (len of verifs) is too long
494 def adjust_worker_number_by_load(self
):
495 #TODO : get a real value for a load
497 #I want at least min_workers or wish_workers (the biggest) but not more than max_workers
498 while len(self
.workers
) < self
.min_workers \
499 or (wish_worker
> len(self
.workers
) and len(self
.workers
) < self
.max_workers
):
500 self
.create_and_launch_worker()
501 #TODO : if len(workers) > 2*wish, maybe we can kill a worker?
504 #We get new actions from schedulers, we create a Message ant we
505 #put it in the s queue (from master to slave)
506 #REF: doc/shinken-action-queues.png (1)
507 def get_new_actions(self
):
508 #Here are the differences between a
509 #poller and a reactionner:
510 #Poller will only do checks,
511 #reactionner do actions
512 do_checks
= self
.__class
__.do_checks
513 do_actions
= self
.__class
__.do_actions
515 #We check for new check in each schedulers and put the result in new_checks
516 for sched_id
in self
.schedulers
:
517 sched
= self
.schedulers
[sched_id
]
518 #If sched is not active, I do not try return
519 if not sched
['active']:
524 if con
is not None: #None = not initilized
525 pyro
.set_timeout(con
, 120)
527 tmp
= con
.get_checks(do_checks
=do_checks
, do_actions
=do_actions
, poller_tags
=self
.poller_tags
)
528 print "Ask actions to", sched_id
, "got", len(tmp
)
529 #We 'tag' them with sched_id and put into queue for workers
530 #REF: doc/shinken-action-queues.png (2)
532 a
.sched_id
= sched_id
534 msg
= Message(id=0, type='Do', data
=a
)
537 self
.nb_actions_in_workers
+= 1
538 else: #no con? make the connexion
539 self
.pynag_con_init(sched_id
)
540 #Ok, con is not know, so we create it
541 #Or maybe is the connexion lsot, we recreate it
542 except (KeyError, Pyro
.errors
.ProtocolError
) , exp
:
544 self
.pynag_con_init(sched_id
)
545 #scheduler must not be initialized
546 #or scheduler must not have checks
547 except (AttributeError, Pyro
.errors
.NamingError
) , exp
:
549 #What the F**k? We do not know what happenned,
551 except Pyro
.errors
.ConnectionClosedError
, exp
:
553 self
.pynag_con_init(sched_id
)
554 except Exception , exp
:
555 print ''.join(Pyro
.util
.getPyroTraceback(exp
))
559 def do_loop_turn(self
):
560 begin_loop
= time
.time()
562 #Maybe the arbiter ask us to wait for a new conf
563 #If true, we must restart all...
564 if self
.have_conf
== False:
565 print "Begin wait initial"
566 self
.wait_for_initial_conf()
567 print "End wait initial"
568 #for sched_id in self.schedulers:
570 # self.pynag_con_init(sched_id)
572 #Now we check if arbiter speek to us in the daemon.
573 #If so, we listen for it
574 #When it push us conf, we reinit connexions
575 #Sleep in waiting a new conf :)
576 self
.watch_for_new_conf(self
.timeout
)
578 #Manage a possible time change (our before will be change with the diff)
579 diff
= self
.check_for_system_time_change()
583 self
.timeout
-= after
-begin_loop
585 if self
.timeout
>= 0:
588 print " ======================== "
590 self
.timeout
= self
.polling_interval
592 #Check if zombies workers are among us :)
593 #If so : KILL THEM ALL!!!
594 self
.check_and_del_zombie_workers()
596 #Print stats for debug
597 for sched_id
in self
.schedulers
:
598 sched
= self
.schedulers
[sched_id
]
599 #In workers we've got actions send to queue - queue size
600 print '[%d][%s]Stats : Workers:%d (Queued:%d Processing:%d ReturnWait:%d)' % \
601 (sched_id
, sched
['name'],len(self
.workers
), self
.s
.qsize(), \
602 self
.nb_actions_in_workers
- self
.s
.qsize(), len(self
.returns_queue
))
605 #Before return or get new actions, see how we manage
606 #old ones : are they still in queue (s)? If True, we
607 #must wait more or at least have more workers
608 wait_ratio
= self
.wait_ratio
.get_load()
609 if self
.s
.qsize() != 0 and wait_ratio
< 5*self
.polling_interval
:
610 print "I decide to up wait ratio"
611 self
.wait_ratio
.update_load(wait_ratio
* 2)
613 #Go to self.polling_interval on normal run, if wait_ratio
614 #was >5*self.polling_interval,
615 #it make it come near 5 because if < 5, go up :)
616 self
.wait_ratio
.update_load(self
.polling_interval
)
617 wait_ratio
= self
.wait_ratio
.get_load()
618 print "Wait ratio:", wait_ratio
620 # We can wait more than 1s if need,
621 # no more than 5s, but no less than 1
622 timeout
= self
.timeout
* wait_ratio
623 timeout
= max(self
.polling_interval
, timeout
)
624 self
.timeout
= min(5*self
.polling_interval
, timeout
)
626 # Maybe we do not have enouth workers, we check for it
627 # and launch new ones if need
628 self
.adjust_worker_number_by_load()
630 # Manage all messages we've got in the last timeout
631 # for queue in self.return_messages:
632 while len(self
.returns_queue
) != 0:
633 self
.manage_action_return(self
.returns_queue
.pop())
635 # Now we can get new actions from schedulers
636 self
.get_new_actions()
638 # We send all finished checks
639 # REF: doc/shinken-action-queues.png (6)
640 self
.manage_returns()
643 def do_post_daemon_init(self
):
645 # And we register them
646 self
.uri2
= pyro
.register(self
.daemon
, self
.interface
, "ForArbiter")
647 self
.uri3
= pyro
.register(self
.daemon
, self
.brok_interface
, "Broks")
649 self
.s
= Queue() #Global Master -> Slave
650 self
.manager
= Manager()
651 self
.returns_queue
= self
.manager
.list()
656 self
.do_load_config()
658 self
.do_daemon_init_and_start()
660 self
.do_post_daemon_init()
662 # We wait for initial conf
663 self
.wait_for_initial_conf()
665 # Connexion init with PyNag server
666 for sched_id
in self
.schedulers
:
668 self
.pynag_con_init(sched_id
)
669 self
.have_new_conf
= False
671 # Allocate Mortal Threads
672 for _
in xrange(1, self
.min_workers
):
673 self
.create_and_launch_worker()
676 self
.timeout
= self
.polling_interval