Add: add a bailout for workers if they got toomanyopenfiles errors.
[shinken.git] / shinken / modulesmanager.py
blob50075b2010088a3b170be472141b53b27461326a
1 #!/usr/bin/env python
2 #Copyright (C) 2009-2010 :
3 # Gabes Jean, naparuba@gmail.com
4 # Gerhard Lausser, Gerhard.Lausser@consol.de
6 #This file is part of Shinken.
8 #Shinken is free software: you can redistribute it and/or modify
9 #it under the terms of the GNU Affero General Public License as published by
10 #the Free Software Foundation, either version 3 of the License, or
11 #(at your option) any later version.
13 #Shinken is distributed in the hope that it will be useful,
14 #but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 #GNU Affero General Public License for more details.
18 #You should have received a copy of the GNU Affero General Public License
19 #along with Shinken. If not, see <http://www.gnu.org/licenses/>.
22 #This class is use to mnager modules and call callback
25 import os
26 import os.path
27 import sys
28 import traceback
31 from multiprocessing import Process, Queue
33 #modulepath = os.path.join(os.path.dirname(imp.find_module("pluginloader")[1]), "modules/")
34 #Thanks http://pytute.blogspot.com/2007/04/python-plugin-system.html
36 class ModulesManager(object):
38 def __init__(self, modules_type, modules_path, modules):
39 self.modules_path = modules_path
40 self.modules_type = modules_type
41 self.modules = modules
42 self.allowed_types = [plug.module_type for plug in self.modules]
45 #Lod all modules
46 def load(self):
47 #We get all modules file of our type (end with broker.py for example)
48 modules_files = [fname[:-3] for fname in os.listdir(self.modules_path) if fname.endswith(self.modules_type+".py")]
50 #And directories (no remove of .py but still with broker for example at the end)
51 modules_files.extend([fname for fname in os.listdir(self.modules_path) if fname.endswith(self.modules_type)])
53 #Now we try to load thems
54 if not self.modules_path in sys.path:
55 sys.path.append(self.modules_path)
57 self.imported_modules = []
58 for fname in modules_files:
59 try:
60 self.imported_modules.append(__import__(fname))
61 except ImportError , exp:
62 print "Warning :", exp
64 self.modules_assoc = []
65 for module in self.modules:
66 module_type = module.module_type
67 is_find = False
68 for mod in self.imported_modules:
69 if mod.properties['type'] == module_type:
70 self.modules_assoc.append((module, mod))
71 is_find = True
72 if not is_find:
73 #No module is suitable, we Raise a Warning
74 print "Warning : the module type %s for %s was not found in modules!" % (module_type, module.get_name())
77 #Set an exit function that is call when we quit
78 def set_exit_handler(self, inst):
79 func = self.manage_signal
80 if os.name == "nt":
81 try:
82 import win32api
83 win32api.SetConsoleCtrlHandler(func, True)
84 except ImportError:
85 version = ".".join(map(str, sys.version_info[:2]))
86 raise Exception("pywin32 not installed for Python " + version)
87 else:
88 import signal
89 signal.signal(signal.SIGTERM, func)
93 #Get modules instance to give them after broks
94 def get_instances(self):
95 self.instances = []
96 for (module, mod) in self.modules_assoc:
97 try:
98 inst = mod.get_instance(module)
99 if inst != None: #None = Bad thing happened :)
100 #the instance need the properties of the module
101 inst.properties = mod.properties
102 self.instances.append(inst)
103 except Exception , exp:
104 print "Error : the module %s raised an exception %s, I remove it!" % (module.get_name(), str(exp))
105 print "Back trace of this remove :"
106 traceback.print_exc(file=sys.stdout)
108 print "Load", len(self.instances), "module instances"
110 to_del = []
111 for inst in self.instances:
112 try:
113 if 'external' in inst.properties and inst.properties['external']:
114 inst.properties['to_queue'] = Queue()
115 inst.properties['from_queue'] = Queue()
116 inst.init()
117 inst.properties['process'] = Process(target=inst.main, args=())
118 inst.properties['process'].start()
119 print "Starting external process (pid:%d) for instance %s" % (inst.properties['process'].pid, inst.get_name())
120 else:
121 inst.properties['external'] = False
122 inst.init()
123 except Exception , exp:
124 print "Error : the instance %s raised an exception %s, I remove it!" % (inst.get_name(), str(exp))
125 print "Back trace of this remove :"
126 traceback.print_exc(file=sys.stdout)
127 to_del.append(inst)
130 for inst in to_del:
131 self.instances.remove(inst)
133 return self.instances
135 def remove_instance(self, inst):
136 #External instances need to be close before (process + queues)
137 if inst.properties['external']:
138 inst.properties['process'].terminate()
139 inst.properties['process'].join(timeout=1)
140 inst.properties['to_queue'].close()
141 inst.properties['to_queue'].join_thread()
142 inst.properties['from_queue'].close()
143 inst.properties['from_queue'].join_thread()
145 #Then do not listen anymore about it
146 self.instances.remove(inst)
149 def check_alive_instances(self):
150 to_del = []
151 #Only for external
152 for inst in self.instances:
153 if inst.properties['external'] and not inst.properties['process'].is_alive():
154 print "Error : the external module %s goes down unexpectly!" % inst.get_name()
155 to_del.append(inst)
157 for inst in to_del:
158 self.remove_instance(inst)
161 def get_internal_instances(self, phase=None):
162 return [inst for inst in self.instances if not inst.properties['external'] and (phase==None or phase in inst.properties['phases'])]
165 def get_external_instances(self, phase=None):
166 return [inst for inst in self.instances if inst.properties['external'] and (phase==None or phase in inst.properties['phases'])]
169 def get_external_to_queues(self, phase=None):
170 return [inst.properties['to_queue'] for inst in self.instances if inst.properties['external'] and (phase==None or phase in inst.properties['phases'])]
173 def get_external_from_queues(self, phase=None):
174 return [inst.properties['from_queue'] for inst in self.instances if inst.properties['external'] and (phase==None or phase in inst.properties['phases'])]
177 def stop_all(self):
178 #Ask internal to quit if they can
179 for inst in self.get_internal_instances():
180 if hasattr(inst, 'quit') and callable(inst.quit):
181 inst.quit()
182 for inst in self.get_external_instances():
183 self.remove_instance(inst)