1 """P2PRFD HTTP proxy"""
15 import twisted
.internet
.reactor
16 import twisted
.web
.resource
17 import twisted
.web
.server
19 import entangled_network
22 def http_get(self
, queries
):
25 http_helper
.http_get(obs
, queries
)
27 print 'observer is off'
30 def notify_new_root(self
, feed_url
, root
):
32 queries
['action'] = 'new_root'
33 queries
['rss'] = feed_url
34 queries
['root'] = repr(root
)
35 self
.http_get(queries
)
37 def notify_adopt_child(self
, feed_url
, parent
, child
, loc
):
39 queries
['action'] = 'adopt_child'
40 queries
['rss'] = feed_url
41 queries
['parent'] = repr(parent
)
42 queries
['child'] = repr(child
)
44 self
.http_get(queries
)
47 class P2prfdProxy(twisted
.web
.resource
.Resource
):
48 """HTTP request handler for P2PRFD proxy"""
56 root_update_interval
= 5
57 def generate_id(self
):
59 sh
.update(str(random
.random()))
60 id = sh
.hexdigest()[:7]
62 def set_root(self
, feed_url
):
63 root_key
= 'root:' + feed_url
64 me
= self
.get_me(feed_url
)
65 self
.dht_network
.set_value(root_key
, repr(me
))
66 self
.observer
.notify_new_root(feed_url
, me
)
68 def insert_new_feed(self
, feed_url
, request
=None):
69 print 'inserting new feed %s' % (feed_url
)
70 def root_gotten(root
):
75 print 'getting parent'
76 parent
= self
.get_parent_from_root(root
, feed_url
)
77 print 'parent =', repr(parent
)
78 me
= (self
.ip_addr
[feed_url
], self
.p2prfd_http_port
)
79 self
.insert_to_parent(parent
, feed_url
)
80 self
.parent_of
[feed_url
] = parent
81 data
= self
.get_data_from_parent(parent
, feed_url
)
85 data
= self
.get_data_from_server(feed_url
)
86 self
.set_root(feed_url
)
87 #dht_network.set_value(root_key, repr((self.ip_addr[feed_url], self.p2prfd_http_port)))
89 data
= self
.get_data_from_server(feed_url
)
91 """tell the DHT that I am the root of this feed"""
92 self
.set_root(feed_url
)
93 #dht_network.set_value(root_key, repr((self.ip_addr[feed_url], self.p2prfd_http_port)))
95 data
= 'should not contain this'
101 self
.cache_data
[feed_url
] = data
102 parsed
= urlparse
.urlparse(feed_url
)
105 if parsed
.port
: http_port
= parsed
.port
106 sock
= socket
.socket()
107 sock
.connect((host
, http_port
))
108 ipaddr
= sock
.getsockname()[0]
109 self
.ip_addr
[feed_url
] = ipaddr
111 dht_network
= self
.dht_network
112 root_key
= 'root:' + feed_url
113 dht_network
.get_value(root_key
, root_gotten
)
115 def notify_node(self
, node
, feed_url
, content
):
116 return self
.http_rpc(node
, 'notify_update', feed_url
, 'status', content
)
117 def notify_children(self
, feed_url
):
118 print 'notify children %s' % (feed_url
)
121 if feed_url
in self
.child_left
: child_left
= self
.child_left
[feed_url
]
122 if feed_url
in self
.child_right
: child_right
= self
.child_right
[feed_url
]
123 me
= (self
.ip_addr
[feed_url
], self
.p2prfd_http_port
)
124 content
= self
.cache_data
[feed_url
]
127 self
.notify_node(child_left
, feed_url
, content
)
129 print 'left child %s error' % (repr(child_left
))
131 self
.observer
.notify_adopt_child(feed_url
, self
.get_me(feed_url
), None, 'left')
132 helper
.del_key(self
.child_left
, feed_url
)
136 self
.notify_node(child_right
, feed_url
, content
)
138 print 'right child %s error' % (repr(child_right
))
140 self
.observer
.notify_adopt_child(feed_url
, self
.get_me(feed_url
), None, 'right')
141 helper
.del_key(self
.child_right
, feed_url
)
144 def update_feed(self
, feed_url
):
145 if feed_url
in self
.parent_of
:
146 parent
= self
.parent_of
[feed_url
]
148 data
= self
.get_data_from_parent(parent
, feed_url
)
149 self
.cache_data
[feed_url
] = data
151 print 'parent %s error' % (repr(parent
))
152 self
.cache_data
[feed_url
] = 'parent error'
153 del self
.parent_of
[feed_url
]
154 self
.insert_new_feed(feed_url
)
156 data
= self
.get_data_from_server(feed_url
)
157 self
.cache_data
[feed_url
] = data
159 self
.notify_children(feed_url
)
161 def ping_node(self
, node
, feed_url
):
162 return self
.http_rpc(node
, 'ping', feed_url
, 'status')
163 def check_children(self
, feed_url
):
164 print 'checking children'
167 if feed_url
in self
.child_left
: child_left
= self
.child_left
[feed_url
]
168 if feed_url
in self
.child_right
: child_right
= self
.child_right
[feed_url
]
169 me
= (self
.ip_addr
[feed_url
], self
.p2prfd_http_port
)
172 self
.ping_node(child_left
, feed_url
)
174 print 'left child %s error' % (repr(child_left
))
176 self
.observer
.notify_adopt_child(feed_url
, self
.get_me(feed_url
), None, 'left')
177 helper
.del_key(self
.child_left
, feed_url
)
181 self
.ping_node(child_right
, feed_url
)
183 print 'right child %s error' % (repr(child_right
))
185 self
.observer
.notify_adopt_child(feed_url
, self
.get_me(feed_url
), None, 'right')
186 helper
.del_key(self
.child_right
, feed_url
)
188 #print 'check result:'
189 #print 'left child is %s' % (repr(child_left))
190 #print 'right child is %s' % (repr(child_right))
192 def root_update(self
):
193 print 'root updating'
194 for feed_url
in self
.cache_data
:
195 if not (feed_url
in self
.parent_of
):
196 print 'updating %s' % (feed_url
)
197 twisted
.internet
.reactor
.callInThread(self
.update_feed
, feed_url
)
198 #self.update_feed(feed_url)
199 twisted
.internet
.reactor
.callLater(self
.root_update_interval
, self
.root_update
)
201 print 'non-root updating'
202 for feed_url
in self
.cache_data
:
203 print 'updating %s' % (feed_url
)
204 twisted
.internet
.reactor
.callInThread(self
.update_feed
, feed_url
)
205 twisted
.internet
.reactor
.callInThread(self
.check_children
, feed_url
)
206 #self.update_feed(feed_url)
207 #self.check_children(feed_url)
208 twisted
.internet
.reactor
.callLater(self
.update_interval
, self
.update
)
210 def http_get(self
, target
, queries
):
211 http_con
= httplib
.HTTPConnection(target
[0], target
[1])
212 http_con
.request("GET", '/?' + urllib
.urlencode(queries
))
213 response
= http_con
.getresponse()
214 resp_str
= response
.read()
215 result
= eval(resp_str
)
218 def get_data_from_parent(self
, parent
, key
):
219 print 'fetching from parent %s' % (repr(parent
))
220 return self
.http_rpc(parent
, 'get_rss', key
, 'data')
221 def get_data_from_server(self
, addr
):
222 """get a resource directly from addr"""
223 parse_result
= urlparse
.urlparse(addr
)
224 print 'fetching directly from %s' % (addr
)
225 http_con
= httplib
.HTTPConnection(parse_result
.netloc
)
226 path
= parse_result
.path
227 if parse_result
.query
:
228 path
+= '?' + parse_result
.query
229 http_con
.request("GET", path
)
230 response
= http_con
.getresponse()
231 data
= response
.read()
234 def get_parent_from_root(self
, root
, feed_url
):
235 """find a good parent from this root"""
236 return self
.http_rpc(root
, 'get_parent', feed_url
, 'parent')
237 def http_rpc(self
, target
, action
, feed_url
, ret_key
, data
= None):
239 queries
['action'] = action
240 queries
['rss'] = feed_url
241 queries
['node_id'] = self
.node_id
242 queries
['from'] = (self
.ip_addr
[feed_url
], self
.p2prfd_http_port
, self
.node_id
)
243 #me = (self.ip_addr[feed_url], self.p2prfd_http_port)
244 me
= self
.get_me(feed_url
)
245 queries
['addr'] = repr(me
)
247 result
= http_helper
.http_get(target
, queries
)
249 queries
['data'] = data
250 result
= http_helper
.http_post(target
, queries
)
251 return result
[ret_key
]
253 def insert_to_parent(self
, parent
, feed_url
):
254 print 'insert_to_parent %s' % (repr(parent
))
255 ret
= self
.http_rpc(parent
, 'insert_child', feed_url
, 'status')
257 def get_me(self
, feed_url
):
258 me
= (self
.ip_addr
[feed_url
], self
.p2prfd_http_port
, self
.node_id
)
260 def render_GET(self
, request
):
261 """beginning of processing the request"""
262 """if I have the data in cache"""
263 if (request
.uri
in self
.cache_data
):
264 data
= self
.cache_data
[request
.uri
]
265 print 'url %s found in cache' % request
.uri
268 self
.insert_new_feed(request
.uri
, request
)
269 return twisted
.web
.server
.NOT_DONE_YET
270 class P2prfdResource(twisted
.web
.resource
.Resource
):
272 def get_node_info(self
, node
, feed_url
):
273 return self
.rsrc
.http_rpc(node
, 'get_node_info', feed_url
, 'info')
274 def render_POST(self
, request
):
275 queries
= request
.args
277 if 'action' in queries
: action
= queries
['action'][0]
279 result
['status'] = False
280 result
['data'] = 'no data found'
281 #print 'AWAL ACTION %s' % action
282 if action
== 'notify_update':
283 feed_url
= queries
['rss'][0]
284 print 'received update for %s' % feed_url
285 content
= queries
['data'][0]
286 self
.rsrc
.cache_data
[feed_url
] = content
287 twisted
.internet
.reactor
.callInThread(self
.rsrc
.notify_children
, feed_url
)
288 #self.rsrc.notify_children(feed_url)
289 result
['status'] = True
290 #print 'AKHIR ACTION %s' % action
292 def render_GET(self
, request
):
293 parsed
= urlparse
.urlparse(request
.uri
)
294 queries
= cgi
.parse_qs(parsed
.query
)
295 #print 'received queries =', queries
297 if 'action' in queries
: action
= queries
['action'][0]
299 result
['status'] = False
300 result
['data'] = 'no data found'
301 #print 'AWAL ACTION %s' % action
302 if action
== 'get_rss':
303 feed_url
= queries
['rss'][0]
304 if feed_url
in self
.rsrc
.cache_data
:
305 result
['status'] = True
306 result
['data'] = self
.rsrc
.cache_data
[feed_url
]
307 elif action
== 'get_parent':
308 feed_url
= queries
['rss'][0]
311 if feed_url
in self
.rsrc
.child_left
: child_left
= self
.rsrc
.child_left
[feed_url
]
312 if feed_url
in self
.rsrc
.child_right
: child_right
= self
.rsrc
.child_right
[feed_url
]
313 #me = (self.rsrc.ip_addr[feed_url], self.rsrc.p2prfd_http_port)
314 me
= self
.rsrc
.get_me(feed_url
)
317 left_info
= self
.get_node_info(child_left
, feed_url
)
319 print 'left child %s error' % (repr(child_left
))
321 self
.rsrc
.observer
.notify_adopt_child(feed_url
, self
.rsrc
.get_me(feed_url
), None, 'left')
322 helper
.del_key(self
.rsrc
.child_left
, feed_url
)
326 right_info
= self
.get_node_info(child_right
, feed_url
)
328 print 'right child %s error' % (repr(child_right
))
330 self
.rsrc
.observer
.notify_adopt_child(feed_url
, self
.rsrc
.get_me(feed_url
), None, 'right')
331 helper
.del_key(self
.rsrc
.child_right
, feed_url
)
333 if child_left
and child_right
:
334 if left_info
['min_depth'] <= right_info
['min_depth']:
335 parent
= self
.rsrc
.get_parent_from_root(child_left
, feed_url
)
337 parent
= self
.rsrc
.get_parent_from_root(child_right
, feed_url
)
338 result
['status'] = True
339 result
['parent'] = parent
342 result
['status'] = True
343 result
['parent'] = parent
344 #result['parent'] = (self.rsrc.ip_addr[feed_url], self.rsrc.p2prfd_http_port)
345 print 'left child is %s' % (repr(child_left
))
346 print 'right child is %s' % (repr(child_right
))
348 elif action
== 'get_node_info':
349 feed_url
= queries
['rss'][0]
352 if feed_url
in self
.rsrc
.child_left
: child_left
= self
.rsrc
.child_left
[feed_url
]
353 if feed_url
in self
.rsrc
.child_right
: child_right
= self
.rsrc
.child_right
[feed_url
]
359 left_info
= self
.get_node_info(child_left
, feed_url
)
360 left_depth
= left_info
['depth']
361 left_min_depth
= left_info
['min_depth']
364 right_info
= self
.get_node_info(child_right
, feed_url
)
365 right_depth
= right_info
['depth']
366 right_min_depth
= right_info
['min_depth']
368 depth
= max(left_depth
, right_depth
) + 1
369 min_depth
= min(left_min_depth
, right_min_depth
) + 1
371 info
['depth'] = depth
372 info
['min_depth'] = min_depth
373 print 'depth is %s' % (depth
)
374 result
['status'] = True
375 result
['info'] = info
376 elif action
== 'insert_child':
377 node_id
= queries
['from'][0][2]
378 if node_id
== self
.rsrc
.node_id
:
379 print 'error connecting, try a moment later'
383 feed_url
= queries
['rss'][0]
384 addr
= eval(queries
['from'][0])
385 print 'inserting child %s' % (repr(addr
))
386 if not (feed_url
in self
.rsrc
.child_left
):
387 self
.rsrc
.child_left
[feed_url
] = addr
388 parent
= self
.rsrc
.get_me(feed_url
)
390 self
.rsrc
.observer
.notify_adopt_child(feed_url
, parent
, child
, 'left')
391 result
['status'] = True
392 elif not (feed_url
in self
.rsrc
.child_right
):
393 self
.rsrc
.child_right
[feed_url
] = addr
394 parent
= self
.rsrc
.get_me(feed_url
)
396 self
.rsrc
.observer
.notify_adopt_child(feed_url
, parent
, child
, 'right')
397 result
['status'] = True
399 result
['status'] = False
402 elif action
== 'ping':
403 result
['status'] = True
404 result
['node_id'] = self
.rsrc
.node_id
406 result
['status'] = False
407 #print 'AKHIR ACTION %s' % action
411 if len(sys
.argv
) == 7:
412 known_node
= (sys
.argv
[4], int(sys
.argv
[5]))
417 rsrc
.dht_network
= entangled_network
.EntangledNetwork(int(sys
.argv
[1]), known_node
)
418 rsrc
.p2prfd_http_port
= int(sys
.argv
[3])
419 rsrc
.node_id
= rsrc
.generate_id()
420 rsrc
.observer
= Observer()
421 rsrc
.observer
.obs_addr
= (obs_ip
, 9501)
422 site
= twisted
.web
.server
.Site(rsrc
)
423 p2prfd_rsrc
= P2prfdResource()
424 p2prfd_rsrc
.rsrc
= rsrc
425 p2prfd_site
= twisted
.web
.server
.Site(p2prfd_rsrc
)
426 update_timer
= twisted
.internet
.reactor
.callLater(rsrc
.update_interval
, rsrc
.update
)
427 root_update_timer
= twisted
.internet
.reactor
.callLater(rsrc
.root_update_interval
, rsrc
.root_update
)
428 rsrc
.update_timer
= update_timer
429 twisted
.internet
.reactor
.listenTCP(int(sys
.argv
[2]), site
)
430 twisted
.internet
.reactor
.listenTCP(rsrc
.p2prfd_http_port
, p2prfd_site
)
431 twisted
.internet
.reactor
.run()