added autoproduce graph
[p2prfd.git] / p2prfd.py
bloba56f44e81a9ecb47323370745159f194c4845a6b
1 """P2PRFD HTTP proxy"""
3 import cgi
4 import hashlib
5 import helper
6 import http_helper
7 import httplib
8 import random
9 import socket
10 import sys
11 import threading
12 import urllib
13 import urlparse
15 import twisted.internet.reactor
16 import twisted.web.resource
17 import twisted.web.server
19 import entangled_network
21 class Observer:
22 def http_get(self, queries):
23 obs = self.obs_addr
24 try:
25 http_helper.http_get(obs, queries)
26 except Exception, ex:
27 print 'observer is off'
28 print str(ex)
29 pass
30 def notify_new_root(self, feed_url, root):
31 queries = {}
32 queries['action'] = 'new_root'
33 queries['rss'] = feed_url
34 queries['root'] = repr(root)
35 self.http_get(queries)
36 pass
37 def notify_adopt_child(self, feed_url, parent, child, loc):
38 queries = {}
39 queries['action'] = 'adopt_child'
40 queries['rss'] = feed_url
41 queries['parent'] = repr(parent)
42 queries['child'] = repr(child)
43 queries['loc'] = loc
44 self.http_get(queries)
45 pass
46 pass
47 class P2prfdProxy(twisted.web.resource.Resource):
48 """HTTP request handler for P2PRFD proxy"""
49 isLeaf = True
50 cache_data = {}
51 parent_of = {}
52 ip_addr = {}
53 child_left = {}
54 child_right = {}
55 update_interval = 63
56 root_update_interval = 60
57 def generate_id(self):
58 sh = hashlib.sha1()
59 sh.update(str(random.random()))
60 id = sh.hexdigest()[:7]
61 return id
62 def set_root(self, feed_url):
63 root_key = 'root:' + feed_url
64 me = self.get_me(feed_url)
65 self.dht_network.set_value(root_key, repr(me))
66 self.observer.notify_new_root(feed_url, me)
67 pass
68 def insert_new_feed(self, feed_url, request=None):
69 print 'inserting new feed %s' % (feed_url)
70 def root_gotten(root):
71 data = ''
72 if type(root) is str:
73 root = eval(root)
74 try:
75 have_new_parent = False
76 while not have_new_parent:
77 print 'getting parent'
78 me = (self.ip_addr[feed_url], self.p2prfd_http_port)
79 origin = self.get_me(feed_url)
80 parent = self.get_parent_from_root(root, feed_url)
81 print 'parent =', repr(parent)
82 ret = self.insert_to_parent(parent, feed_url)
83 print 'insert status =', repr(ret)
84 if not ret: print 'FAILED inserting'
85 have_new_parent = ret
86 self.parent_of[feed_url] = parent
87 data = self.get_data_from_parent(parent, feed_url)
88 except Exception, ex:
89 print 'root error'
90 print str(ex)
91 data = self.get_data_from_server(feed_url)
92 self.set_root(feed_url)
93 #dht_network.set_value(root_key, repr((self.ip_addr[feed_url], self.p2prfd_http_port)))
94 elif root is None:
95 data = self.get_data_from_server(feed_url)
97 """tell the DHT that I am the root of this feed"""
98 self.set_root(feed_url)
99 #dht_network.set_value(root_key, repr((self.ip_addr[feed_url], self.p2prfd_http_port)))
100 else:
101 data = 'should not contain this'
102 pass
103 if request:
104 request.write(data)
105 request.finish()
106 """cache the data"""
107 self.cache_data[feed_url] = data
108 parsed = urlparse.urlparse(feed_url)
109 host = parsed.netloc
110 http_port = 80
111 if parsed.port: http_port = parsed.port
112 sock = socket.socket()
113 sock.connect((host, http_port))
114 ipaddr = sock.getsockname()[0]
115 self.ip_addr[feed_url] = ipaddr
117 dht_network = self.dht_network
118 root_key = 'root:' + feed_url
119 dht_network.get_value(root_key, root_gotten)
120 pass
121 def notify_node(self, node, feed_url, content):
122 return self.http_rpc(node, 'notify_update', feed_url, 'status', content)
123 def try_connection(self, try_actions, except_actions):
124 try:
125 try_actions()
126 except Exception, exc:
127 print 'exception:', exc
128 except_actions()
129 def notify_children(self, feed_url):
130 print 'notify children %s' % (feed_url)
131 child_left = None
132 child_right = None
133 if feed_url in self.child_left: child_left = self.child_left[feed_url]
134 if feed_url in self.child_right: child_right = self.child_right[feed_url]
135 me = (self.ip_addr[feed_url], self.p2prfd_http_port)
136 content = self.cache_data[feed_url]
137 if child_left:
138 def try_actions():
139 self.notify_node(child_left, feed_url, content)
140 def except_actions():
141 print 'left child %s error' % (repr(child_left))
142 #child_left = None
143 self.observer.notify_adopt_child(feed_url, self.get_me(feed_url), None, 'left')
144 helper.del_key(self.child_left, feed_url)
145 self.try_connection(try_actions, except_actions)
146 if child_right:
147 def try_actions():
148 self.notify_node(child_right, feed_url, content)
149 def except_actions():
150 print 'right child %s error' % (repr(child_right))
151 #child_right = None
152 self.observer.notify_adopt_child(feed_url, self.get_me(feed_url), None, 'right')
153 helper.del_key(self.child_right, feed_url)
154 self.try_connection(try_actions, except_actions)
155 pass
156 def update_feed(self, feed_url):
157 if feed_url in self.parent_of:
158 parent = self.parent_of[feed_url]
159 try:
160 data = self.get_data_from_parent(parent, feed_url)
161 self.cache_data[feed_url] = data
162 except:
163 print 'parent %s error' % (repr(parent))
164 self.cache_data[feed_url] = 'parent error'
165 del self.parent_of[feed_url]
166 self.insert_new_feed(feed_url)
167 else:
168 data = self.get_data_from_server(feed_url)
169 self.cache_data[feed_url] = data
170 pass
171 self.notify_children(feed_url)
172 pass
173 def ping_node(self, node, feed_url):
174 return self.http_rpc(node, 'ping', feed_url, 'status')
175 def check_children(self, feed_url):
176 print 'checking children'
177 child_left = None
178 child_right = None
179 if feed_url in self.child_left: child_left = self.child_left[feed_url]
180 if feed_url in self.child_right: child_right = self.child_right[feed_url]
181 me = (self.ip_addr[feed_url], self.p2prfd_http_port)
182 if child_left:
183 def try_actions():
184 self.ping_node(child_left, feed_url)
185 def except_actions():
186 print 'left child %s error' % (repr(child_left))
187 self.observer.notify_adopt_child(feed_url, self.get_me(feed_url), None, 'left')
188 helper.del_key(self.child_left, feed_url)
189 self.try_connection(try_actions, except_actions)
190 if child_right:
191 def try_actions():
192 self.ping_node(child_right, feed_url)
193 def except_actions():
194 print 'right child %s error' % (repr(child_right))
195 self.observer.notify_adopt_child(feed_url, self.get_me(feed_url), None, 'right')
196 helper.del_key(self.child_right, feed_url)
197 self.try_connection(try_actions, except_actions)
198 #print 'check result:'
199 #print 'left child is %s' % (repr(child_left))
200 #print 'right child is %s' % (repr(child_right))
201 pass
202 def root_update(self):
203 print 'root updating'
204 for feed_url in self.cache_data:
205 if not (feed_url in self.parent_of):
206 print 'updating %s' % (feed_url)
207 twisted.internet.reactor.callInThread(self.update_feed, feed_url)
208 #self.update_feed(feed_url)
209 twisted.internet.reactor.callLater(self.root_update_interval, self.root_update)
210 def update(self):
211 print 'non-root updating'
212 for feed_url in self.cache_data:
213 print 'updating %s' % (feed_url)
214 twisted.internet.reactor.callInThread(self.update_feed, feed_url)
215 twisted.internet.reactor.callInThread(self.check_children, feed_url)
216 #self.update_feed(feed_url)
217 #self.check_children(feed_url)
218 twisted.internet.reactor.callLater(self.update_interval, self.update)
219 pass
220 def http_get(self, target, queries):
221 http_con = httplib.HTTPConnection(target[0], target[1])
222 http_con.request("GET", '/?' + urllib.urlencode(queries))
223 response = http_con.getresponse()
224 resp_str = response.read()
225 result = eval(resp_str)
226 http_con.close()
227 return result
228 def get_data_from_parent(self, parent, key):
229 print 'fetching from parent %s' % (repr(parent))
230 return self.http_rpc(parent, 'get_rss', key, 'data')
231 def get_data_from_server(self, addr):
232 """get a resource directly from addr"""
233 parse_result = urlparse.urlparse(addr)
234 print 'fetching directly from %s' % (addr)
235 http_con = httplib.HTTPConnection(parse_result.netloc)
236 path = parse_result.path
237 if parse_result.query:
238 path += '?' + parse_result.query
239 http_con.request("GET", path)
240 response = http_con.getresponse()
241 data = response.read()
242 http_con.close()
243 return data
244 def get_parent_from_root(self, root, feed_url, origin=None):
245 """find a good parent from this root"""
246 return self.http_rpc(root, 'get_parent', feed_url, 'parent', None, origin)
247 def http_rpc(self, target, action, feed_url, ret_key, data=None, origin=None):
248 queries = {}
249 queries['action'] = action
250 queries['rss'] = feed_url
251 queries['node_id'] = self.node_id
252 queries['from'] = (self.ip_addr[feed_url], self.p2prfd_http_port, self.node_id)
253 queries['origin'] = origin
254 if origin is None: queries['origin'] = queries['from']
255 #me = (self.ip_addr[feed_url], self.p2prfd_http_port)
256 me = self.get_me(feed_url)
257 queries['addr'] = repr(me)
258 if data is None:
259 result = http_helper.http_get(target, queries)
260 else:
261 queries['data'] = data
262 result = http_helper.http_post(target, queries)
263 return result[ret_key]
265 def insert_to_parent(self, parent, feed_url):
266 print 'insert_to_parent %s' % (repr(parent))
267 ret = self.http_rpc(parent, 'insert_child', feed_url, 'status')
268 return ret
269 def get_me(self, feed_url):
270 me = (self.ip_addr[feed_url], self.p2prfd_http_port, self.node_id)
271 return me
272 def render_GET(self, request):
273 """beginning of processing the request"""
274 """if I have the data in cache"""
275 if (request.uri in self.cache_data):
276 data = self.cache_data[request.uri]
277 print 'url %s found in cache' % request.uri
278 return data
279 else:
280 self.insert_new_feed(request.uri, request)
281 return twisted.web.server.NOT_DONE_YET
282 class P2prfdResource(twisted.web.resource.Resource):
283 isLeaf = True
284 def get_node_info(self, node, feed_url):
285 return self.rsrc.http_rpc(node, 'get_node_info', feed_url, 'info')
286 def render_POST(self, request):
287 queries = request.args
288 action = ''
289 if 'action' in queries: action = queries['action'][0]
290 result = {}
291 result['status'] = False
292 result['data'] = 'no data found'
293 #print 'AWAL ACTION %s' % action
294 if action == 'notify_update':
295 feed_url = queries['rss'][0]
296 print 'received update for %s' % feed_url
297 content = queries['data'][0]
298 self.rsrc.cache_data[feed_url] = content
299 twisted.internet.reactor.callInThread(self.rsrc.notify_children, feed_url)
300 #self.rsrc.notify_children(feed_url)
301 result['status'] = True
302 #print 'AKHIR ACTION %s' % action
303 return repr(result)
304 def render_GET(self, request):
305 parsed = urlparse.urlparse(request.uri)
306 queries = cgi.parse_qs(parsed.query)
307 #print 'received queries =', queries
308 action = ''
309 if 'action' in queries: action = queries['action'][0]
310 result = {}
311 result['status'] = False
312 result['data'] = 'no data found'
313 #print 'AWAL ACTION %s' % action
314 if action == 'get_rss':
315 feed_url = queries['rss'][0]
316 if feed_url in self.rsrc.cache_data:
317 result['status'] = True
318 result['data'] = self.rsrc.cache_data[feed_url]
319 elif action == 'get_parent':
320 feed_url = queries['rss'][0]
321 child_left = None
322 child_right = None
323 if feed_url in self.rsrc.child_left: child_left = self.rsrc.child_left[feed_url]
324 if feed_url in self.rsrc.child_right: child_right = self.rsrc.child_right[feed_url]
325 #me = (self.rsrc.ip_addr[feed_url], self.rsrc.p2prfd_http_port)
326 me = self.rsrc.get_me(feed_url)
327 from_node = eval(queries['from'][0])
328 origin = eval(queries['origin'][0])
329 #print 'DEBUG:'
330 #print 'child_left =', repr(child_left)
331 #print 'child_right =', repr(child_right)
332 #print 'from =', repr(from_node)
333 #print 'origin =', repr(origin)
334 if (child_left == origin) or (child_right == origin):
335 print 'ONE OF CHILD IS FOUND TO BE THE SAME AS GET_PARENT ASKER'
336 parent = me
337 result['status'] = True
338 result['parent'] = parent
339 else:
340 if child_left:
341 try:
342 left_info = self.get_node_info(child_left, feed_url)
343 except Exception, exc:
344 print 'exceptione:', exc
345 print 'left child %s error' % (repr(child_left))
346 child_left = None
347 self.rsrc.observer.notify_adopt_child(feed_url, self.rsrc.get_me(feed_url), None, 'left')
348 helper.del_key(self.rsrc.child_left, feed_url)
349 pass
350 if child_right:
351 try:
352 right_info = self.get_node_info(child_right, feed_url)
353 except Exception, exc:
354 print 'exceptione:', exc
355 print 'right child %s error' % (repr(child_right))
356 child_right = None
357 self.rsrc.observer.notify_adopt_child(feed_url, self.rsrc.get_me(feed_url), None, 'right')
358 helper.del_key(self.rsrc.child_right, feed_url)
359 pass
360 if child_left and child_right:
361 if left_info['min_depth'] <= right_info['min_depth']:
362 parent = self.rsrc.get_parent_from_root(child_left, feed_url, origin)
363 else:
364 parent = self.rsrc.get_parent_from_root(child_right, feed_url, origin)
365 result['status'] = True
366 result['parent'] = parent
367 else:
368 parent = me
369 result['status'] = True
370 result['parent'] = parent
371 #result['parent'] = (self.rsrc.ip_addr[feed_url], self.rsrc.p2prfd_http_port)
372 print 'left child is %s' % (repr(child_left))
373 print 'right child is %s' % (repr(child_right))
374 pass
375 elif action == 'get_node_info':
376 feed_url = queries['rss'][0]
377 child_left = None
378 child_right = None
379 if feed_url in self.rsrc.child_left: child_left = self.rsrc.child_left[feed_url]
380 if feed_url in self.rsrc.child_right: child_right = self.rsrc.child_right[feed_url]
381 left_depth = 0
382 right_depth = 0
383 left_min_depth = 0
384 right_min_depth = 0
385 if child_left:
386 left_info = self.get_node_info(child_left, feed_url)
387 left_depth = left_info['depth']
388 left_min_depth = left_info['min_depth']
389 pass
390 if child_right:
391 right_info = self.get_node_info(child_right, feed_url)
392 right_depth = right_info['depth']
393 right_min_depth = right_info['min_depth']
394 pass
395 depth = max(left_depth, right_depth) + 1
396 min_depth = min(left_min_depth, right_min_depth) + 1
397 info = {}
398 info['depth'] = depth
399 info['min_depth'] = min_depth
400 print 'depth is %s' % (depth)
401 result['status'] = True
402 result['info'] = info
403 elif action == 'insert_child':
404 node_id = eval(queries['from'][0])[2]
405 if node_id == self.rsrc.node_id:
406 print 'error connecting, try a moment later'
407 sys.exit()
408 pass
409 else:
410 feed_url = queries['rss'][0]
411 child_left = None
412 child_right = None
413 if feed_url in self.rsrc.child_left: child_left = self.rsrc.child_left[feed_url]
414 if feed_url in self.rsrc.child_right: child_right = self.rsrc.child_right[feed_url]
415 from_node = eval(queries['from'][0])
416 if (child_left == from_node):
417 result['status'] = True
418 elif (child_right == from_node):
419 result['status'] = True
420 else:
421 feed_url = queries['rss'][0]
422 addr = eval(queries['from'][0])
423 print 'inserting child %s' % (repr(addr))
424 if not (feed_url in self.rsrc.child_left):
425 self.rsrc.child_left[feed_url] = addr
426 parent = self.rsrc.get_me(feed_url)
427 child = addr
428 self.rsrc.observer.notify_adopt_child(feed_url, parent, child, 'left')
429 result['status'] = True
430 elif not (feed_url in self.rsrc.child_right):
431 self.rsrc.child_right[feed_url] = addr
432 parent = self.rsrc.get_me(feed_url)
433 child = addr
434 self.rsrc.observer.notify_adopt_child(feed_url, parent, child, 'right')
435 result['status'] = True
436 else:
437 result['status'] = False
438 pass
439 pass
440 elif action == 'ping':
441 result['status'] = True
442 result['node_id'] = self.rsrc.node_id
443 else:
444 result['status'] = False
445 #print 'AKHIR ACTION %s' % action
446 return repr(result)
447 rsrc = P2prfdProxy()
448 obs_ip = ''
449 if len(sys.argv) == 7:
450 known_node = (sys.argv[4], int(sys.argv[5]))
451 obs_ip = sys.argv[6]
452 else:
453 known_node = None
454 obs_ip = sys.argv[4]
455 rsrc.dht_network = entangled_network.EntangledNetwork(int(sys.argv[1]), known_node)
456 rsrc.p2prfd_http_port = int(sys.argv[3])
457 rsrc.node_id = rsrc.generate_id()
458 rsrc.observer = Observer()
459 rsrc.observer.obs_addr = (obs_ip, 9501)
460 site = twisted.web.server.Site(rsrc)
461 p2prfd_rsrc = P2prfdResource()
462 p2prfd_rsrc.rsrc = rsrc
463 p2prfd_site = twisted.web.server.Site(p2prfd_rsrc)
464 update_timer = twisted.internet.reactor.callLater(rsrc.update_interval, rsrc.update)
465 root_update_timer = twisted.internet.reactor.callLater(rsrc.root_update_interval, rsrc.root_update)
466 rsrc.update_timer = update_timer
467 twisted.internet.reactor.listenTCP(int(sys.argv[2]), site)
468 twisted.internet.reactor.listenTCP(rsrc.p2prfd_http_port, p2prfd_site)
469 twisted.internet.reactor.run()