disabled some debug comments
[p2prfd.git] / p2prfd.py
blob215e3c36279aafe1dde2791f7114fb452d5c46aa
1 """P2PRFD HTTP proxy"""
3 import cgi
4 import hashlib
5 import helper
6 import http_helper
7 import httplib
8 import random
9 import socket
10 import sys
11 import threading
12 import urllib
13 import urlparse
15 import twisted.internet.reactor
16 import twisted.web.resource
17 import twisted.web.server
19 import entangled_network
21 class Observer:
22 def http_get(self, queries):
23 obs = self.obs_addr
24 try:
25 http_helper.http_get(obs, queries)
26 except Exception, ex:
27 print 'observer is off'
28 print str(ex)
29 pass
30 def notify_new_root(self, feed_url, root):
31 queries = {}
32 queries['action'] = 'new_root'
33 queries['rss'] = feed_url
34 queries['root'] = repr(root)
35 self.http_get(queries)
36 pass
37 def notify_adopt_child(self, feed_url, parent, child, loc):
38 queries = {}
39 queries['action'] = 'adopt_child'
40 queries['rss'] = feed_url
41 queries['parent'] = repr(parent)
42 queries['child'] = repr(child)
43 queries['loc'] = loc
44 self.http_get(queries)
45 pass
46 pass
47 class P2prfdProxy(twisted.web.resource.Resource):
48 """HTTP request handler for P2PRFD proxy"""
49 isLeaf = True
50 cache_data = {}
51 parent_of = {}
52 ip_addr = {}
53 child_left = {}
54 child_right = {}
55 update_interval = 9
56 root_update_interval = 5
57 def generate_id(self):
58 sh = hashlib.sha1()
59 sh.update(str(random.random()))
60 id = sh.hexdigest()[:7]
61 return id
62 def set_root(self, feed_url):
63 root_key = 'root:' + feed_url
64 me = self.get_me(feed_url)
65 self.dht_network.set_value(root_key, repr(me))
66 self.observer.notify_new_root(feed_url, me)
67 pass
68 def insert_new_feed(self, feed_url, request=None):
69 print 'inserting new feed %s' % (feed_url)
70 def root_gotten(root):
71 data = ''
72 if type(root) is str:
73 root = eval(root)
74 try:
75 print 'getting parent'
76 parent = self.get_parent_from_root(root, feed_url)
77 print 'parent =', repr(parent)
78 me = (self.ip_addr[feed_url], self.p2prfd_http_port)
79 self.insert_to_parent(parent, feed_url)
80 self.parent_of[feed_url] = parent
81 data = self.get_data_from_parent(parent, feed_url)
82 except Exception, ex:
83 print 'root error'
84 print str(ex)
85 data = self.get_data_from_server(feed_url)
86 self.set_root(feed_url)
87 #dht_network.set_value(root_key, repr((self.ip_addr[feed_url], self.p2prfd_http_port)))
88 elif root is None:
89 data = self.get_data_from_server(feed_url)
91 """tell the DHT that I am the root of this feed"""
92 self.set_root(feed_url)
93 #dht_network.set_value(root_key, repr((self.ip_addr[feed_url], self.p2prfd_http_port)))
94 else:
95 data = 'should not contain this'
96 pass
97 if request:
98 request.write(data)
99 request.finish()
100 """cache the data"""
101 self.cache_data[feed_url] = data
102 parsed = urlparse.urlparse(feed_url)
103 host = parsed.netloc
104 http_port = 80
105 if parsed.port: http_port = parsed.port
106 sock = socket.socket()
107 sock.connect((host, http_port))
108 ipaddr = sock.getsockname()[0]
109 self.ip_addr[feed_url] = ipaddr
111 dht_network = self.dht_network
112 root_key = 'root:' + feed_url
113 dht_network.get_value(root_key, root_gotten)
114 pass
115 def notify_node(self, node, feed_url, content):
116 return self.http_rpc(node, 'notify_update', feed_url, 'status', content)
117 def notify_children(self, feed_url):
118 print 'notify children %s' % (feed_url)
119 child_left = None
120 child_right = None
121 if feed_url in self.child_left: child_left = self.child_left[feed_url]
122 if feed_url in self.child_right: child_right = self.child_right[feed_url]
123 me = (self.ip_addr[feed_url], self.p2prfd_http_port)
124 content = self.cache_data[feed_url]
125 if child_left:
126 try:
127 self.notify_node(child_left, feed_url, content)
128 except:
129 print 'left child %s error' % (repr(child_left))
130 child_left = None
131 self.observer.notify_adopt_child(feed_url, self.get_me(feed_url), None, 'left')
132 helper.del_key(self.child_left, feed_url)
133 pass
134 if child_right:
135 try:
136 self.notify_node(child_right, feed_url, content)
137 except:
138 print 'right child %s error' % (repr(child_right))
139 child_right = None
140 self.observer.notify_adopt_child(feed_url, self.get_me(feed_url), None, 'right')
141 helper.del_key(self.child_right, feed_url)
142 pass
143 pass
144 def update_feed(self, feed_url):
145 if feed_url in self.parent_of:
146 parent = self.parent_of[feed_url]
147 try:
148 data = self.get_data_from_parent(parent, feed_url)
149 self.cache_data[feed_url] = data
150 except:
151 print 'parent %s error' % (repr(parent))
152 self.cache_data[feed_url] = 'parent error'
153 del self.parent_of[feed_url]
154 self.insert_new_feed(feed_url)
155 else:
156 data = self.get_data_from_server(feed_url)
157 self.cache_data[feed_url] = data
158 pass
159 self.notify_children(feed_url)
160 pass
161 def ping_node(self, node, feed_url):
162 return self.http_rpc(node, 'ping', feed_url, 'status')
163 def check_children(self, feed_url):
164 print 'checking children'
165 child_left = None
166 child_right = None
167 if feed_url in self.child_left: child_left = self.child_left[feed_url]
168 if feed_url in self.child_right: child_right = self.child_right[feed_url]
169 me = (self.ip_addr[feed_url], self.p2prfd_http_port)
170 if child_left:
171 try:
172 self.ping_node(child_left, feed_url)
173 except:
174 print 'left child %s error' % (repr(child_left))
175 child_left = None
176 self.observer.notify_adopt_child(feed_url, self.get_me(feed_url), None, 'left')
177 helper.del_key(self.child_left, feed_url)
178 pass
179 if child_right:
180 try:
181 self.ping_node(child_right, feed_url)
182 except:
183 print 'right child %s error' % (repr(child_right))
184 child_right = None
185 self.observer.notify_adopt_child(feed_url, self.get_me(feed_url), None, 'right')
186 helper.del_key(self.child_right, feed_url)
187 pass
188 #print 'check result:'
189 #print 'left child is %s' % (repr(child_left))
190 #print 'right child is %s' % (repr(child_right))
191 pass
192 def root_update(self):
193 print 'root updating'
194 for feed_url in self.cache_data:
195 if not (feed_url in self.parent_of):
196 print 'updating %s' % (feed_url)
197 twisted.internet.reactor.callInThread(self.update_feed, feed_url)
198 #self.update_feed(feed_url)
199 twisted.internet.reactor.callLater(self.root_update_interval, self.root_update)
200 def update(self):
201 print 'non-root updating'
202 for feed_url in self.cache_data:
203 print 'updating %s' % (feed_url)
204 twisted.internet.reactor.callInThread(self.update_feed, feed_url)
205 twisted.internet.reactor.callInThread(self.check_children, feed_url)
206 #self.update_feed(feed_url)
207 #self.check_children(feed_url)
208 twisted.internet.reactor.callLater(self.update_interval, self.update)
209 pass
210 def http_get(self, target, queries):
211 http_con = httplib.HTTPConnection(target[0], target[1])
212 http_con.request("GET", '/?' + urllib.urlencode(queries))
213 response = http_con.getresponse()
214 resp_str = response.read()
215 result = eval(resp_str)
216 http_con.close()
217 return result
218 def get_data_from_parent(self, parent, key):
219 print 'fetching from parent %s' % (repr(parent))
220 return self.http_rpc(parent, 'get_rss', key, 'data')
221 def get_data_from_server(self, addr):
222 """get a resource directly from addr"""
223 parse_result = urlparse.urlparse(addr)
224 print 'fetching directly from %s' % (addr)
225 http_con = httplib.HTTPConnection(parse_result.netloc)
226 path = parse_result.path
227 if parse_result.query:
228 path += '?' + parse_result.query
229 http_con.request("GET", path)
230 response = http_con.getresponse()
231 data = response.read()
232 http_con.close()
233 return data
234 def get_parent_from_root(self, root, feed_url):
235 """find a good parent from this root"""
236 return self.http_rpc(root, 'get_parent', feed_url, 'parent')
237 def http_rpc(self, target, action, feed_url, ret_key, data = None):
238 queries = {}
239 queries['action'] = action
240 queries['rss'] = feed_url
241 queries['node_id'] = self.node_id
242 queries['from'] = (self.ip_addr[feed_url], self.p2prfd_http_port, self.node_id)
243 #me = (self.ip_addr[feed_url], self.p2prfd_http_port)
244 me = self.get_me(feed_url)
245 queries['addr'] = repr(me)
246 if data is None:
247 result = http_helper.http_get(target, queries)
248 else:
249 queries['data'] = data
250 result = http_helper.http_post(target, queries)
251 return result[ret_key]
253 def insert_to_parent(self, parent, feed_url):
254 print 'insert_to_parent %s' % (repr(parent))
255 ret = self.http_rpc(parent, 'insert_child', feed_url, 'status')
256 return ret
257 def get_me(self, feed_url):
258 me = (self.ip_addr[feed_url], self.p2prfd_http_port, self.node_id)
259 return me
260 def render_GET(self, request):
261 """beginning of processing the request"""
262 """if I have the data in cache"""
263 if (request.uri in self.cache_data):
264 data = self.cache_data[request.uri]
265 print 'url %s found in cache' % request.uri
266 return data
267 else:
268 self.insert_new_feed(request.uri, request)
269 return twisted.web.server.NOT_DONE_YET
270 class P2prfdResource(twisted.web.resource.Resource):
271 isLeaf = True
272 def get_node_info(self, node, feed_url):
273 return self.rsrc.http_rpc(node, 'get_node_info', feed_url, 'info')
274 def render_POST(self, request):
275 queries = request.args
276 action = ''
277 if 'action' in queries: action = queries['action'][0]
278 result = {}
279 result['status'] = False
280 result['data'] = 'no data found'
281 #print 'AWAL ACTION %s' % action
282 if action == 'notify_update':
283 feed_url = queries['rss'][0]
284 print 'received update for %s' % feed_url
285 content = queries['data'][0]
286 self.rsrc.cache_data[feed_url] = content
287 twisted.internet.reactor.callInThread(self.rsrc.notify_children, feed_url)
288 #self.rsrc.notify_children(feed_url)
289 result['status'] = True
290 #print 'AKHIR ACTION %s' % action
291 return repr(result)
292 def render_GET(self, request):
293 parsed = urlparse.urlparse(request.uri)
294 queries = cgi.parse_qs(parsed.query)
295 #print 'received queries =', queries
296 action = ''
297 if 'action' in queries: action = queries['action'][0]
298 result = {}
299 result['status'] = False
300 result['data'] = 'no data found'
301 #print 'AWAL ACTION %s' % action
302 if action == 'get_rss':
303 feed_url = queries['rss'][0]
304 if feed_url in self.rsrc.cache_data:
305 result['status'] = True
306 result['data'] = self.rsrc.cache_data[feed_url]
307 elif action == 'get_parent':
308 feed_url = queries['rss'][0]
309 child_left = None
310 child_right = None
311 if feed_url in self.rsrc.child_left: child_left = self.rsrc.child_left[feed_url]
312 if feed_url in self.rsrc.child_right: child_right = self.rsrc.child_right[feed_url]
313 #me = (self.rsrc.ip_addr[feed_url], self.rsrc.p2prfd_http_port)
314 me = self.rsrc.get_me(feed_url)
315 if child_left:
316 try:
317 left_info = self.get_node_info(child_left, feed_url)
318 except:
319 print 'left child %s error' % (repr(child_left))
320 child_left = None
321 self.rsrc.observer.notify_adopt_child(feed_url, self.rsrc.get_me(feed_url), None, 'left')
322 helper.del_key(self.rsrc.child_left, feed_url)
323 pass
324 if child_right:
325 try:
326 right_info = self.get_node_info(child_right, feed_url)
327 except:
328 print 'right child %s error' % (repr(child_right))
329 child_right = None
330 self.rsrc.observer.notify_adopt_child(feed_url, self.rsrc.get_me(feed_url), None, 'right')
331 helper.del_key(self.rsrc.child_right, feed_url)
332 pass
333 if child_left and child_right:
334 if left_info['min_depth'] <= right_info['min_depth']:
335 parent = self.rsrc.get_parent_from_root(child_left, feed_url)
336 else:
337 parent = self.rsrc.get_parent_from_root(child_right, feed_url)
338 result['status'] = True
339 result['parent'] = parent
340 else:
341 parent = me
342 result['status'] = True
343 result['parent'] = parent
344 #result['parent'] = (self.rsrc.ip_addr[feed_url], self.rsrc.p2prfd_http_port)
345 print 'left child is %s' % (repr(child_left))
346 print 'right child is %s' % (repr(child_right))
347 pass
348 elif action == 'get_node_info':
349 feed_url = queries['rss'][0]
350 child_left = None
351 child_right = None
352 if feed_url in self.rsrc.child_left: child_left = self.rsrc.child_left[feed_url]
353 if feed_url in self.rsrc.child_right: child_right = self.rsrc.child_right[feed_url]
354 left_depth = 0
355 right_depth = 0
356 left_min_depth = 0
357 right_min_depth = 0
358 if child_left:
359 left_info = self.get_node_info(child_left, feed_url)
360 left_depth = left_info['depth']
361 left_min_depth = left_info['min_depth']
362 pass
363 if child_right:
364 right_info = self.get_node_info(child_right, feed_url)
365 right_depth = right_info['depth']
366 right_min_depth = right_info['min_depth']
367 pass
368 depth = max(left_depth, right_depth) + 1
369 min_depth = min(left_min_depth, right_min_depth) + 1
370 info = {}
371 info['depth'] = depth
372 info['min_depth'] = min_depth
373 print 'depth is %s' % (depth)
374 result['status'] = True
375 result['info'] = info
376 elif action == 'insert_child':
377 node_id = queries['from'][0][2]
378 if node_id == self.rsrc.node_id:
379 print 'error connecting, try a moment later'
380 sys.exit()
381 pass
382 else:
383 feed_url = queries['rss'][0]
384 addr = eval(queries['from'][0])
385 print 'inserting child %s' % (repr(addr))
386 if not (feed_url in self.rsrc.child_left):
387 self.rsrc.child_left[feed_url] = addr
388 parent = self.rsrc.get_me(feed_url)
389 child = addr
390 self.rsrc.observer.notify_adopt_child(feed_url, parent, child, 'left')
391 result['status'] = True
392 elif not (feed_url in self.rsrc.child_right):
393 self.rsrc.child_right[feed_url] = addr
394 parent = self.rsrc.get_me(feed_url)
395 child = addr
396 self.rsrc.observer.notify_adopt_child(feed_url, parent, child, 'right')
397 result['status'] = True
398 else:
399 result['status'] = False
400 pass
401 pass
402 elif action == 'ping':
403 result['status'] = True
404 result['node_id'] = self.rsrc.node_id
405 else:
406 result['status'] = False
407 #print 'AKHIR ACTION %s' % action
408 return repr(result)
409 rsrc = P2prfdProxy()
410 obs_ip = ''
411 if len(sys.argv) == 7:
412 known_node = (sys.argv[4], int(sys.argv[5]))
413 obs_ip = sys.argv[6]
414 else:
415 known_node = None
416 obs_ip = sys.argv[4]
417 rsrc.dht_network = entangled_network.EntangledNetwork(int(sys.argv[1]), known_node)
418 rsrc.p2prfd_http_port = int(sys.argv[3])
419 rsrc.node_id = rsrc.generate_id()
420 rsrc.observer = Observer()
421 rsrc.observer.obs_addr = (obs_ip, 9501)
422 site = twisted.web.server.Site(rsrc)
423 p2prfd_rsrc = P2prfdResource()
424 p2prfd_rsrc.rsrc = rsrc
425 p2prfd_site = twisted.web.server.Site(p2prfd_rsrc)
426 update_timer = twisted.internet.reactor.callLater(rsrc.update_interval, rsrc.update)
427 root_update_timer = twisted.internet.reactor.callLater(rsrc.root_update_interval, rsrc.root_update)
428 rsrc.update_timer = update_timer
429 twisted.internet.reactor.listenTCP(int(sys.argv[2]), site)
430 twisted.internet.reactor.listenTCP(rsrc.p2prfd_http_port, p2prfd_site)
431 twisted.internet.reactor.run()