1 """P2PRFD HTTP proxy"""
15 import twisted
.internet
.reactor
16 import twisted
.web
.resource
17 import twisted
.web
.server
19 import entangled_network
22 def http_get(self
, queries
):
25 http_helper
.http_get(obs
, queries
)
27 print 'observer is off'
30 def notify_new_root(self
, feed_url
, root
):
32 queries
['action'] = 'new_root'
33 queries
['rss'] = feed_url
34 queries
['root'] = repr(root
)
35 self
.http_get(queries
)
37 def notify_adopt_child(self
, feed_url
, parent
, child
, loc
):
39 queries
['action'] = 'adopt_child'
40 queries
['rss'] = feed_url
41 queries
['parent'] = repr(parent
)
42 queries
['child'] = repr(child
)
44 self
.http_get(queries
)
47 class P2prfdProxy(twisted
.web
.resource
.Resource
):
48 """HTTP request handler for P2PRFD proxy"""
56 root_update_interval
= 60
57 def generate_id(self
):
59 sh
.update(str(random
.random()))
60 id = sh
.hexdigest()[:7]
62 def set_root(self
, feed_url
):
63 root_key
= 'root:' + feed_url
64 me
= self
.get_me(feed_url
)
65 self
.dht_network
.set_value(root_key
, repr(me
))
66 self
.observer
.notify_new_root(feed_url
, me
)
68 def insert_new_feed(self
, feed_url
, request
=None):
69 print 'inserting new feed %s' % (feed_url
)
70 def root_gotten(root
):
75 have_new_parent
= False
76 while not have_new_parent
:
77 print 'getting parent'
78 me
= (self
.ip_addr
[feed_url
], self
.p2prfd_http_port
)
79 origin
= self
.get_me(feed_url
)
80 parent
= self
.get_parent_from_root(root
, feed_url
)
81 print 'parent =', repr(parent
)
82 ret
= self
.insert_to_parent(parent
, feed_url
)
83 print 'insert status =', repr(ret
)
84 if not ret
: print 'FAILED inserting'
86 self
.parent_of
[feed_url
] = parent
87 data
= self
.get_data_from_parent(parent
, feed_url
)
91 data
= self
.get_data_from_server(feed_url
)
92 self
.set_root(feed_url
)
93 #dht_network.set_value(root_key, repr((self.ip_addr[feed_url], self.p2prfd_http_port)))
95 data
= self
.get_data_from_server(feed_url
)
97 """tell the DHT that I am the root of this feed"""
98 self
.set_root(feed_url
)
99 #dht_network.set_value(root_key, repr((self.ip_addr[feed_url], self.p2prfd_http_port)))
101 data
= 'should not contain this'
107 self
.cache_data
[feed_url
] = data
108 parsed
= urlparse
.urlparse(feed_url
)
111 if parsed
.port
: http_port
= parsed
.port
112 sock
= socket
.socket()
113 sock
.connect((host
, http_port
))
114 ipaddr
= sock
.getsockname()[0]
115 self
.ip_addr
[feed_url
] = ipaddr
117 dht_network
= self
.dht_network
118 root_key
= 'root:' + feed_url
119 dht_network
.get_value(root_key
, root_gotten
)
121 def notify_node(self
, node
, feed_url
, content
):
122 return self
.http_rpc(node
, 'notify_update', feed_url
, 'status', content
)
123 def try_connection(self
, try_actions
, except_actions
):
126 except Exception, exc
:
127 print 'exception:', exc
129 def notify_children(self
, feed_url
):
130 print 'notify children %s' % (feed_url
)
133 if feed_url
in self
.child_left
: child_left
= self
.child_left
[feed_url
]
134 if feed_url
in self
.child_right
: child_right
= self
.child_right
[feed_url
]
135 me
= (self
.ip_addr
[feed_url
], self
.p2prfd_http_port
)
136 content
= self
.cache_data
[feed_url
]
139 self
.notify_node(child_left
, feed_url
, content
)
140 def except_actions():
141 print 'left child %s error' % (repr(child_left
))
143 self
.observer
.notify_adopt_child(feed_url
, self
.get_me(feed_url
), None, 'left')
144 helper
.del_key(self
.child_left
, feed_url
)
145 self
.try_connection(try_actions
, except_actions
)
148 self
.notify_node(child_right
, feed_url
, content
)
149 def except_actions():
150 print 'right child %s error' % (repr(child_right
))
152 self
.observer
.notify_adopt_child(feed_url
, self
.get_me(feed_url
), None, 'right')
153 helper
.del_key(self
.child_right
, feed_url
)
154 self
.try_connection(try_actions
, except_actions
)
156 def update_feed(self
, feed_url
):
157 if feed_url
in self
.parent_of
:
158 parent
= self
.parent_of
[feed_url
]
160 data
= self
.get_data_from_parent(parent
, feed_url
)
161 self
.cache_data
[feed_url
] = data
163 print 'parent %s error' % (repr(parent
))
164 self
.cache_data
[feed_url
] = 'parent error'
165 del self
.parent_of
[feed_url
]
166 self
.insert_new_feed(feed_url
)
168 data
= self
.get_data_from_server(feed_url
)
169 self
.cache_data
[feed_url
] = data
171 self
.notify_children(feed_url
)
173 def ping_node(self
, node
, feed_url
):
174 return self
.http_rpc(node
, 'ping', feed_url
, 'status')
175 def check_children(self
, feed_url
):
176 print 'checking children'
179 if feed_url
in self
.child_left
: child_left
= self
.child_left
[feed_url
]
180 if feed_url
in self
.child_right
: child_right
= self
.child_right
[feed_url
]
181 me
= (self
.ip_addr
[feed_url
], self
.p2prfd_http_port
)
184 self
.ping_node(child_left
, feed_url
)
185 def except_actions():
186 print 'left child %s error' % (repr(child_left
))
187 self
.observer
.notify_adopt_child(feed_url
, self
.get_me(feed_url
), None, 'left')
188 helper
.del_key(self
.child_left
, feed_url
)
189 self
.try_connection(try_actions
, except_actions
)
192 self
.ping_node(child_right
, feed_url
)
193 def except_actions():
194 print 'right child %s error' % (repr(child_right
))
195 self
.observer
.notify_adopt_child(feed_url
, self
.get_me(feed_url
), None, 'right')
196 helper
.del_key(self
.child_right
, feed_url
)
197 self
.try_connection(try_actions
, except_actions
)
198 #print 'check result:'
199 #print 'left child is %s' % (repr(child_left))
200 #print 'right child is %s' % (repr(child_right))
202 def root_update(self
):
203 print 'root updating'
204 for feed_url
in self
.cache_data
:
205 if not (feed_url
in self
.parent_of
):
206 print 'updating %s' % (feed_url
)
207 twisted
.internet
.reactor
.callInThread(self
.update_feed
, feed_url
)
208 #self.update_feed(feed_url)
209 twisted
.internet
.reactor
.callLater(self
.root_update_interval
, self
.root_update
)
211 print 'non-root updating'
212 for feed_url
in self
.cache_data
:
213 print 'updating %s' % (feed_url
)
214 twisted
.internet
.reactor
.callInThread(self
.update_feed
, feed_url
)
215 twisted
.internet
.reactor
.callInThread(self
.check_children
, feed_url
)
216 #self.update_feed(feed_url)
217 #self.check_children(feed_url)
218 twisted
.internet
.reactor
.callLater(self
.update_interval
, self
.update
)
220 def http_get(self
, target
, queries
):
221 http_con
= httplib
.HTTPConnection(target
[0], target
[1])
222 http_con
.request("GET", '/?' + urllib
.urlencode(queries
))
223 response
= http_con
.getresponse()
224 resp_str
= response
.read()
225 result
= eval(resp_str
)
228 def get_data_from_parent(self
, parent
, key
):
229 print 'fetching from parent %s' % (repr(parent
))
230 return self
.http_rpc(parent
, 'get_rss', key
, 'data')
231 def get_data_from_server(self
, addr
):
232 """get a resource directly from addr"""
233 parse_result
= urlparse
.urlparse(addr
)
234 print 'fetching directly from %s' % (addr
)
235 http_con
= httplib
.HTTPConnection(parse_result
.netloc
)
236 path
= parse_result
.path
237 if parse_result
.query
:
238 path
+= '?' + parse_result
.query
239 http_con
.request("GET", path
)
240 response
= http_con
.getresponse()
241 data
= response
.read()
244 def get_parent_from_root(self
, root
, feed_url
, origin
=None):
245 """find a good parent from this root"""
246 return self
.http_rpc(root
, 'get_parent', feed_url
, 'parent', None, origin
)
247 def http_rpc(self
, target
, action
, feed_url
, ret_key
, data
=None, origin
=None):
249 queries
['action'] = action
250 queries
['rss'] = feed_url
251 queries
['node_id'] = self
.node_id
252 queries
['from'] = (self
.ip_addr
[feed_url
], self
.p2prfd_http_port
, self
.node_id
)
253 queries
['origin'] = origin
254 if origin
is None: queries
['origin'] = queries
['from']
255 #me = (self.ip_addr[feed_url], self.p2prfd_http_port)
256 me
= self
.get_me(feed_url
)
257 queries
['addr'] = repr(me
)
259 result
= http_helper
.http_get(target
, queries
)
261 queries
['data'] = data
262 result
= http_helper
.http_post(target
, queries
)
263 return result
[ret_key
]
265 def insert_to_parent(self
, parent
, feed_url
):
266 print 'insert_to_parent %s' % (repr(parent
))
267 ret
= self
.http_rpc(parent
, 'insert_child', feed_url
, 'status')
269 def get_me(self
, feed_url
):
270 me
= (self
.ip_addr
[feed_url
], self
.p2prfd_http_port
, self
.node_id
)
272 def render_GET(self
, request
):
273 """beginning of processing the request"""
274 """if I have the data in cache"""
275 if (request
.uri
in self
.cache_data
):
276 data
= self
.cache_data
[request
.uri
]
277 print 'url %s found in cache' % request
.uri
280 self
.insert_new_feed(request
.uri
, request
)
281 return twisted
.web
.server
.NOT_DONE_YET
282 class P2prfdResource(twisted
.web
.resource
.Resource
):
284 def get_node_info(self
, node
, feed_url
):
285 return self
.rsrc
.http_rpc(node
, 'get_node_info', feed_url
, 'info')
286 def render_POST(self
, request
):
287 queries
= request
.args
289 if 'action' in queries
: action
= queries
['action'][0]
291 result
['status'] = False
292 result
['data'] = 'no data found'
293 #print 'AWAL ACTION %s' % action
294 if action
== 'notify_update':
295 feed_url
= queries
['rss'][0]
296 print 'received update for %s' % feed_url
297 content
= queries
['data'][0]
298 self
.rsrc
.cache_data
[feed_url
] = content
299 twisted
.internet
.reactor
.callInThread(self
.rsrc
.notify_children
, feed_url
)
300 #self.rsrc.notify_children(feed_url)
301 result
['status'] = True
302 #print 'AKHIR ACTION %s' % action
304 def render_GET(self
, request
):
305 parsed
= urlparse
.urlparse(request
.uri
)
306 queries
= cgi
.parse_qs(parsed
.query
)
307 #print 'received queries =', queries
309 if 'action' in queries
: action
= queries
['action'][0]
311 result
['status'] = False
312 result
['data'] = 'no data found'
313 #print 'AWAL ACTION %s' % action
314 if action
== 'get_rss':
315 feed_url
= queries
['rss'][0]
316 if feed_url
in self
.rsrc
.cache_data
:
317 result
['status'] = True
318 result
['data'] = self
.rsrc
.cache_data
[feed_url
]
319 elif action
== 'get_parent':
320 feed_url
= queries
['rss'][0]
323 if feed_url
in self
.rsrc
.child_left
: child_left
= self
.rsrc
.child_left
[feed_url
]
324 if feed_url
in self
.rsrc
.child_right
: child_right
= self
.rsrc
.child_right
[feed_url
]
325 #me = (self.rsrc.ip_addr[feed_url], self.rsrc.p2prfd_http_port)
326 me
= self
.rsrc
.get_me(feed_url
)
327 from_node
= eval(queries
['from'][0])
328 origin
= eval(queries
['origin'][0])
330 #print 'child_left =', repr(child_left)
331 #print 'child_right =', repr(child_right)
332 #print 'from =', repr(from_node)
333 #print 'origin =', repr(origin)
334 if (child_left
== origin
) or (child_right
== origin
):
335 print 'ONE OF CHILD IS FOUND TO BE THE SAME AS GET_PARENT ASKER'
337 result
['status'] = True
338 result
['parent'] = parent
342 left_info
= self
.get_node_info(child_left
, feed_url
)
343 except Exception, exc
:
344 print 'exceptione:', exc
345 print 'left child %s error' % (repr(child_left
))
347 self
.rsrc
.observer
.notify_adopt_child(feed_url
, self
.rsrc
.get_me(feed_url
), None, 'left')
348 helper
.del_key(self
.rsrc
.child_left
, feed_url
)
352 right_info
= self
.get_node_info(child_right
, feed_url
)
353 except Exception, exc
:
354 print 'exceptione:', exc
355 print 'right child %s error' % (repr(child_right
))
357 self
.rsrc
.observer
.notify_adopt_child(feed_url
, self
.rsrc
.get_me(feed_url
), None, 'right')
358 helper
.del_key(self
.rsrc
.child_right
, feed_url
)
360 if child_left
and child_right
:
361 if left_info
['min_depth'] <= right_info
['min_depth']:
362 parent
= self
.rsrc
.get_parent_from_root(child_left
, feed_url
, origin
)
364 parent
= self
.rsrc
.get_parent_from_root(child_right
, feed_url
, origin
)
365 result
['status'] = True
366 result
['parent'] = parent
369 result
['status'] = True
370 result
['parent'] = parent
371 #result['parent'] = (self.rsrc.ip_addr[feed_url], self.rsrc.p2prfd_http_port)
372 print 'left child is %s' % (repr(child_left
))
373 print 'right child is %s' % (repr(child_right
))
375 elif action
== 'get_node_info':
376 feed_url
= queries
['rss'][0]
379 if feed_url
in self
.rsrc
.child_left
: child_left
= self
.rsrc
.child_left
[feed_url
]
380 if feed_url
in self
.rsrc
.child_right
: child_right
= self
.rsrc
.child_right
[feed_url
]
386 left_info
= self
.get_node_info(child_left
, feed_url
)
387 left_depth
= left_info
['depth']
388 left_min_depth
= left_info
['min_depth']
391 right_info
= self
.get_node_info(child_right
, feed_url
)
392 right_depth
= right_info
['depth']
393 right_min_depth
= right_info
['min_depth']
395 depth
= max(left_depth
, right_depth
) + 1
396 min_depth
= min(left_min_depth
, right_min_depth
) + 1
398 info
['depth'] = depth
399 info
['min_depth'] = min_depth
400 print 'depth is %s' % (depth
)
401 result
['status'] = True
402 result
['info'] = info
403 elif action
== 'insert_child':
404 node_id
= eval(queries
['from'][0])[2]
405 if node_id
== self
.rsrc
.node_id
:
406 print 'error connecting, try a moment later'
410 feed_url
= queries
['rss'][0]
413 if feed_url
in self
.rsrc
.child_left
: child_left
= self
.rsrc
.child_left
[feed_url
]
414 if feed_url
in self
.rsrc
.child_right
: child_right
= self
.rsrc
.child_right
[feed_url
]
415 from_node
= eval(queries
['from'][0])
416 if (child_left
== from_node
):
417 result
['status'] = True
418 elif (child_right
== from_node
):
419 result
['status'] = True
421 feed_url
= queries
['rss'][0]
422 addr
= eval(queries
['from'][0])
423 print 'inserting child %s' % (repr(addr
))
424 if not (feed_url
in self
.rsrc
.child_left
):
425 self
.rsrc
.child_left
[feed_url
] = addr
426 parent
= self
.rsrc
.get_me(feed_url
)
428 self
.rsrc
.observer
.notify_adopt_child(feed_url
, parent
, child
, 'left')
429 result
['status'] = True
430 elif not (feed_url
in self
.rsrc
.child_right
):
431 self
.rsrc
.child_right
[feed_url
] = addr
432 parent
= self
.rsrc
.get_me(feed_url
)
434 self
.rsrc
.observer
.notify_adopt_child(feed_url
, parent
, child
, 'right')
435 result
['status'] = True
437 result
['status'] = False
440 elif action
== 'ping':
441 result
['status'] = True
442 result
['node_id'] = self
.rsrc
.node_id
444 result
['status'] = False
445 #print 'AKHIR ACTION %s' % action
449 if len(sys
.argv
) == 7:
450 known_node
= (sys
.argv
[4], int(sys
.argv
[5]))
455 rsrc
.dht_network
= entangled_network
.EntangledNetwork(int(sys
.argv
[1]), known_node
)
456 rsrc
.p2prfd_http_port
= int(sys
.argv
[3])
457 rsrc
.node_id
= rsrc
.generate_id()
458 rsrc
.observer
= Observer()
459 rsrc
.observer
.obs_addr
= (obs_ip
, 9501)
460 site
= twisted
.web
.server
.Site(rsrc
)
461 p2prfd_rsrc
= P2prfdResource()
462 p2prfd_rsrc
.rsrc
= rsrc
463 p2prfd_site
= twisted
.web
.server
.Site(p2prfd_rsrc
)
464 update_timer
= twisted
.internet
.reactor
.callLater(rsrc
.update_interval
, rsrc
.update
)
465 root_update_timer
= twisted
.internet
.reactor
.callLater(rsrc
.root_update_interval
, rsrc
.root_update
)
466 rsrc
.update_timer
= update_timer
467 twisted
.internet
.reactor
.listenTCP(int(sys
.argv
[2]), site
)
468 twisted
.internet
.reactor
.listenTCP(rsrc
.p2prfd_http_port
, p2prfd_site
)
469 twisted
.internet
.reactor
.run()