typo error
[pp3.git] / ppserver.py
blobbf6e18ed6ce69e760cc3a35acf34c5c27f51200f
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 #Copyright (c) 2009 Ondřej Súkup
4 #Parallel Python Software: http://www.parallelpython.com
5 # Copyright (c) 2005-2009, Vitalii Vanovschi
6 # All rights reserved.
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions are met:
9 # * Redistributions of source code must retain the above copyright notice,
10 # this list of conditions and the following disclaimer.
11 # * Redistributions in binary form must reproduce the above copyright
12 # notice, this list of conditions and the following disclaimer in the
13 # documentation and/or other materials provided with the distribution.
14 # * Neither the name of the author nor the names of its contributors
15 # may be used to endorse or promote products derived from this software
16 # without specific prior written permission.
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22 # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
28 # THE POSSIBILITY OF SUCH DAMAGE.
30 import logging
31 import getopt
32 import sys
33 import socket
34 import _thread
35 import random
36 import string
37 import time
38 import os
40 import pptransport
41 import ppauto
42 from pp import Server
45 copyright = "Copyright (c) 2009 Ondřej Súkup /n \
46 Copyright (c) 2005-2009 Vitalii Vanovschi. All rights reserved"
47 version = "3.0.0"
48 # compartibility with Python 2.6
49 try:
50 import hashlib
51 sha_new = hashlib.sha1
52 except ImportError:
53 import sha
54 sha_new = sha.new
57 class _NetworkServer(Server):
58 """Network Server Class
59 """
61 def __init__(self, ncpus="autodetect", interface="0.0.0.0",
62 broadcast="255.255.255.255", port=None, secret=None,
63 timeout=None, loglevel=logging.WARNING, restart=False,
64 proto=0):
65 Server.__init__(self, ncpus, secret=secret, loglevel=loglevel,
66 restart=restart, proto=proto)
67 self.host = interface
68 self.bcast = broadcast
69 if port is not None:
70 self.port = port
71 else:
72 self.port = self.default_port
73 self.timeout = timeout
74 self.ncon = 0
75 self.last_con_time = time.time()
76 self.ncon_lock = _thread.allocate_lock()
78 logging.debug("Strarting network server interface=%s port=%i"
79 % (self.host, self.port))
80 if self.timeout is not None:
81 logging.debug("ppserver will exit in %i seconds if no "\
82 "connections with clients exist" % (self.timeout))
83 _thread.start_new_thread(self.check_timeout, ())
85 def ncon_add(self, val):
86 """Keeps track of the number of connections and time of the last one"""
87 self.ncon_lock.acquire()
88 self.ncon += val
89 self.last_con_time = time.time()
90 self.ncon_lock.release()
92 def check_timeout(self):
93 """Checks if timeout happened and shutdowns server if it did"""
94 while True:
95 if self.ncon == 0:
96 idle_time = time.time() - self.last_con_time
97 if idle_time < self.timeout:
98 time.sleep(self.timeout - idle_time)
99 else:
100 logging.debug("exiting ppserver due to timeout (no client"\
101 " connections in last %i sec)", self.timeout)
102 os._exit(0)
103 else:
104 time.sleep(self.timeout)
106 def listen(self):
107 """Initiates listenting to incoming connections"""
108 try:
109 ssocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
110 # following allows ppserver to restart faster on the same port
111 ssocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
112 ssocket.bind((self.host, self.port))
113 ssocket.listen(5)
114 except socket.error:
115 logging.error("Cannot create socket with port " + str(self.port)
116 + " (port is already in use)")
118 try:
119 while 1:
120 #accept connections from outside
121 (csocket, address) = ssocket.accept()
122 #now do something with the clientsocket
123 #in this case, we'll pretend this is a threaded server
124 _thread.start_new_thread(self.crun, (csocket, ))
125 except:
126 logging.debug("Closing server socket")
127 ssocket.close()
129 def crun(self, csocket):
130 """Authenticates client and handles its jobs"""
131 mysocket = pptransport.CSocketTransport(csocket)
132 #send PP version
133 mysocket.send(version)
134 #generate a random string
135 srandom = "".join([random.choice(string.ascii_letters)
136 for i in range(16)])
137 mysocket.send(srandom)
138 answer = sha_new(srandom+self.secret).hexdigest()
139 cleintanswer = mysocket.receive()
140 if answer != cleintanswer:
141 logging.warning("Authentification failed, client host=%s, port=%i"
142 % csocket.getpeername())
143 mysocket.send("FAILED")
144 csocket.close()
145 return
146 else:
147 mysocket.send("OK")
149 ctype = mysocket.receive()
150 logging.debug("Control message received: " + ctype)
151 self.ncon_add(1)
152 try:
153 if ctype == "STAT":
154 #reset time at each new connection
155 self.get_stats()["local"].time = 0.0
156 mysocket.send(str(self.get_ncpus()))
157 while 1:
158 mysocket.receive()
159 mysocket.send(str(self.get_stats()["local"].time))
160 elif ctype=="EXEC":
161 while 1:
162 sfunc = mysocket.creceive()
163 sargs = mysocket.receive()
164 fun = self.insert(sfunc, sargs)
165 sresult = fun(True)
166 mysocket.send(sresult)
167 except:
168 #print sys.excepthook(*sys.exc_info())
169 logging.debug("Closing client socket")
170 csocket.close()
171 self.ncon_add(-1)
173 def broadcast(self):
174 """Initiaates auto-discovery mechanism"""
175 discover = ppauto.Discover(self)
176 _thread.start_new_thread(discover.run,
177 ((self.host, self.port),
178 (self.bcast, self.port)),
182 def parse_config(file_loc):
184 Parses a config file in a very forgiving way.
186 # If we don't have configobj installed then let the user know and exit
187 try:
188 from configobj import ConfigObj
189 except ImportError as ie:
190 print("ERROR: You must have configobj installed to use \
191 configuration files. You can still use command line switches.", file=sys.stderr)
192 sys.exit(1)
194 if not os.access(file_loc, os.F_OK):
195 print("ERROR: Can not access %s." % arg, file=sys.stderr)
196 sys.exit(1)
198 # Load the configuration file
199 config = ConfigObj(file_loc)
200 # try each config item and use the result if it exists. If it doesn't
201 # then simply pass and move along
202 try:
203 args['secret'] = config['general'].get('secret')
204 except:
205 pass
207 try:
208 autodiscovery = config['network'].as_bool('autodiscovery')
209 except:
210 pass
212 try:
213 args['interface'] = config['network'].get('interface',
214 default="0.0.0.0")
215 except:
216 pass
218 try:
219 args['broadcast'] = config['network'].get('broadcast')
220 except:
221 pass
223 try:
224 args['port'] = config['network'].as_int('port')
225 except:
226 pass
228 try:
229 args['loglevel'] = config['general'].as_bool('debug')
230 except:
231 pass
233 try:
234 args['ncpus'] = config['general'].as_int('workers')
235 except:
236 pass
238 try:
239 args['proto'] = config['general'].as_int('proto')
240 except:
241 pass
243 try:
244 args['restart'] = config['general'].as_bool('restart')
245 except:
246 pass
248 try:
249 args['timeout'] = config['network'].as_int('timeout')
250 except:
251 pass
252 # Return a tuple of the args dict and autodiscovery variable
253 return args, autodiscovery
256 def print_usage():
257 """Prints help"""
258 print("Parallel Python Network Server (pp-" + version + ")")
259 print("Usage: ppserver.py [-hdar] [-n proto] [-c config_path]"\
260 " [-i interface] [-b broadcast] [-p port] [-w nworkers]"\
261 " [-s secret] [-t seconds]")
262 print()
263 print("Options: ")
264 print("-h : this help message")
265 print("-d : debug")
266 print("-a : enable auto-discovery service")
267 print("-r : restart worker process after each"\
268 " task completion")
269 print("-n proto : protocol number for pickle module")
270 print("-c path : path to config file")
271 print("-i interface : interface to listen")
272 print("-b broadcast : broadcast address for auto-discovery service")
273 print("-p port : port to listen")
274 print("-w nworkers : number of workers to start")
275 print("-s secret : secret for authentication")
276 print("-t seconds : timeout to exit if no connections with "\
277 "clients exist")
278 print()
279 print("Due to the security concerns always use a non-trivial secret key.")
280 print("Secret key set by -s switch will override secret key assigned by")
281 print("pp_secret variable in .pythonrc.py")
282 print()
283 print("Please visit http://www.parallelpython.com for extended up-to-date")
284 print("documentation, examples and support forums")
287 if __name__ == "__main__":
288 try:
289 opts, args = getopt.getopt(sys.argv[1:],
290 "hdarn:c:b:i:p:w:s:t:", ["help"])
291 except getopt.GetoptError:
292 print_usage()
293 sys.exit(1)
295 args = {}
296 autodiscovery = False
298 for opt, arg in opts:
299 if opt in ("-h", "--help"):
300 print_usage()
301 sys.exit()
302 elif opt == "-c":
303 args, autodiscovery = parse_config(arg)
304 elif opt == "-d":
305 args["loglevel"] = logging.DEBUG
306 elif opt == "-i":
307 args["interface"] = arg
308 elif opt == "-s":
309 args["secret"] = arg
310 elif opt == "-p":
311 args["port"] = int(arg)
312 elif opt == "-w":
313 args["ncpus"] = int(arg)
314 elif opt == "-a":
315 autodiscovery = True
316 elif opt == "-r":
317 args["restart"] = True
318 elif opt == "-b":
319 args["broadcast"] = arg
320 elif opt == "-n":
321 args["proto"] = int(arg)
322 elif opt == "-t":
323 args["timeout"] = int(arg)
325 server = _NetworkServer(**args)
326 if autodiscovery:
327 server.broadcast()
328 server.listen()
329 #have to destroy it here explicitelly otherwise an exception
330 #comes out in Python 2.4
331 del server
333 # Parallel Python Software: http://www.parallelpython.com