2 #Copyright (C) 2009-2010 :
3 # Gabes Jean, naparuba@gmail.com
4 # Gerhard Lausser, Gerhard.Lausser@consol.de
6 #This file is part of Shinken.
8 #Shinken is free software: you can redistribute it and/or modify
9 #it under the terms of the GNU Affero General Public License as published by
10 #the Free Software Foundation, either version 3 of the License, or
11 #(at your option) any later version.
13 #Shinken is distributed in the hope that it will be useful,
14 #but WITHOUT ANY WARRANTY; without even the implied warranty of
15 #MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 #GNU Affero General Public License for more details.
18 #You should have received a copy of the GNU Affero General Public License
19 #along with Shinken. If not, see <http://www.gnu.org/licenses/>.
22 #This class is use to mnager modules and call callback
31 from multiprocessing
import Process
, Queue
33 #modulepath = os.path.join(os.path.dirname(imp.find_module("pluginloader")[1]), "modules/")
34 #Thanks http://pytute.blogspot.com/2007/04/python-plugin-system.html
36 class ModulesManager(object):
38 def __init__(self
, modules_type
, modules_path
, modules
):
39 self
.modules_path
= modules_path
40 self
.modules_type
= modules_type
41 self
.modules
= modules
42 self
.allowed_types
= [plug
.module_type
for plug
in self
.modules
]
47 #We get all modules file of our type (end with broker.py for example)
48 modules_files
= [fname
[:-3] for fname
in os
.listdir(self
.modules_path
) if fname
.endswith(self
.modules_type
+".py")]
50 #And directories (no remove of .py but still with broker for example at the end)
51 modules_files
.extend([fname
for fname
in os
.listdir(self
.modules_path
) if fname
.endswith(self
.modules_type
)])
53 #Now we try to load thems
54 if not self
.modules_path
in sys
.path
:
55 sys
.path
.append(self
.modules_path
)
57 self
.imported_modules
= []
58 for fname
in modules_files
:
60 self
.imported_modules
.append(__import__(fname
))
61 except ImportError , exp
:
62 print "Warning :", exp
64 self
.modules_assoc
= []
65 for module
in self
.modules
:
66 module_type
= module
.module_type
68 for mod
in self
.imported_modules
:
69 if mod
.properties
['type'] == module_type
:
70 self
.modules_assoc
.append((module
, mod
))
73 #No module is suitable, we Raise a Warning
74 print "Warning : the module type %s for %s was not found in modules!" % (module_type
, module
.get_name())
77 #Set an exit function that is call when we quit
78 def set_exit_handler(self
, inst
):
79 func
= self
.manage_signal
83 win32api
.SetConsoleCtrlHandler(func
, True)
85 version
= ".".join(map(str, sys
.version_info
[:2]))
86 raise Exception("pywin32 not installed for Python " + version
)
89 signal
.signal(signal
.SIGTERM
, func
)
93 #Get modules instance to give them after broks
94 def get_instances(self
):
96 for (module
, mod
) in self
.modules_assoc
:
98 inst
= mod
.get_instance(module
)
99 if inst
!= None: #None = Bad thing happened :)
100 #the instance need the properties of the module
101 inst
.properties
= mod
.properties
102 self
.instances
.append(inst
)
103 except Exception , exp
:
104 print "Error : the module %s raised an exception %s, I remove it!" % (module
.get_name(), str(exp
))
105 print "Back trace of this remove :"
106 traceback
.print_exc(file=sys
.stdout
)
108 print "Load", len(self
.instances
), "module instances"
111 for inst
in self
.instances
:
113 if 'external' in inst
.properties
and inst
.properties
['external']:
114 inst
.properties
['to_queue'] = Queue()
115 inst
.properties
['from_queue'] = Queue()
117 inst
.properties
['process'] = Process(target
=inst
.main
, args
=())
118 inst
.properties
['process'].start()
119 print "Starting external process (pid:%d) for instance %s" % (inst
.properties
['process'].pid
, inst
.get_name())
121 inst
.properties
['external'] = False
123 except Exception , exp
:
124 print "Error : the instance %s raised an exception %s, I remove it!" % (inst
.get_name(), str(exp
))
125 print "Back trace of this remove :"
126 traceback
.print_exc(file=sys
.stdout
)
131 self
.instances
.remove(inst
)
133 return self
.instances
135 def remove_instance(self
, inst
):
136 #External instances need to be close before (process + queues)
137 if inst
.properties
['external']:
138 inst
.properties
['process'].terminate()
139 inst
.properties
['process'].join(timeout
=1)
140 inst
.properties
['to_queue'].close()
141 inst
.properties
['to_queue'].join_thread()
142 inst
.properties
['from_queue'].close()
143 inst
.properties
['from_queue'].join_thread()
145 #Then do not listen anymore about it
146 self
.instances
.remove(inst
)
149 def check_alive_instances(self
):
152 for inst
in self
.instances
:
153 if inst
.properties
['external'] and not inst
.properties
['process'].is_alive():
154 print "Error : the external module %s goes down unexpectly!" % inst
.get_name()
158 self
.remove_instance(inst
)
161 def get_internal_instances(self
, phase
=None):
162 return [inst
for inst
in self
.instances
if not inst
.properties
['external'] and (phase
==None or phase
in inst
.properties
['phases'])]
165 def get_external_instances(self
, phase
=None):
166 return [inst
for inst
in self
.instances
if inst
.properties
['external'] and (phase
==None or phase
in inst
.properties
['phases'])]
169 def get_external_to_queues(self
, phase
=None):
170 return [inst
.properties
['to_queue'] for inst
in self
.instances
if inst
.properties
['external'] and (phase
==None or phase
in inst
.properties
['phases'])]
173 def get_external_from_queues(self
, phase
=None):
174 return [inst
.properties
['from_queue'] for inst
in self
.instances
if inst
.properties
['external'] and (phase
==None or phase
in inst
.properties
['phases'])]
178 #Ask internal to quit if they can
179 for inst
in self
.get_internal_instances():
180 if hasattr(inst
, 'quit') and callable(inst
.quit
):
182 for inst
in self
.get_external_instances():
183 self
.remove_instance(inst
)