1 """P2PRFD HTTP proxy"""
13 import twisted
.internet
.reactor
14 import twisted
.web
.resource
15 import twisted
.web
.server
17 import entangled_network
19 class P2prfdProxy(twisted
.web
.resource
.Resource
):
20 """HTTP request handler for P2PRFD proxy"""
28 def generate_id(self
):
30 sh
.update(str(random
.random()))
33 def insert_new_feed(self
, feed_url
, request
=None):
34 print 'inserting new feed %s' % (feed_url
)
35 def root_gotten(root
):
40 print 'getting parent'
41 parent
= self
.get_parent_from_root(root
, feed_url
)
42 print 'parent =', repr(parent
)
43 me
= (self
.ip_addr
[feed_url
], self
.p2prfd_http_port
)
44 self
.insert_to_parent(parent
, me
, feed_url
)
45 self
.parent_of
[feed_url
] = parent
46 data
= self
.get_data_from_parent(parent
, feed_url
)
48 data
= self
.get_data_from_server(feed_url
)
49 dht_network
.set_value(root_key
, repr((self
.ip_addr
[feed_url
], self
.p2prfd_http_port
)))
51 data
= self
.get_data_from_server(feed_url
)
53 """tell the DHT that I am the root of this feed"""
54 dht_network
.set_value(root_key
, repr((self
.ip_addr
[feed_url
], self
.p2prfd_http_port
)))
56 data
= 'should not contain this'
62 self
.cache_data
[feed_url
] = data
63 parsed
= urlparse
.urlparse(feed_url
)
66 if parsed
.port
: http_port
= parsed
.port
67 sock
= socket
.socket()
68 sock
.connect((host
, http_port
))
69 ipaddr
= sock
.getsockname()[0]
70 self
.ip_addr
[feed_url
] = ipaddr
72 dht_network
= self
.dht_network
73 root_key
= 'root:' + feed_url
74 dht_network
.get_value(root_key
, root_gotten
)
75 def update_feed(self
, feed_url
):
76 if feed_url
in self
.parent_of
:
77 parent
= self
.parent_of
[feed_url
]
79 data
= self
.get_data_from_parent(parent
, feed_url
)
80 self
.cache_data
[feed_url
] = data
82 print 'parent %s error' % (repr(parent
))
83 self
.cache_data
[feed_url
] = 'parent error'
84 del self
.parent_of
[feed_url
]
85 self
.insert_new_feed(feed_url
)
87 data
= self
.get_data_from_server(feed_url
)
88 self
.cache_data
[feed_url
] = data
91 def ping_node(self
, node
):
92 http_con
= httplib
.HTTPConnection(node
[0], node
[1])
94 queries
['action'] = 'ping'
95 http_con
.request("GET", '/?' + urllib
.urlencode(queries
))
97 def check_children(self
, feed_url
):
98 print 'checking children'
101 if feed_url
in self
.child_left
: child_left
= self
.child_left
[feed_url
]
102 if feed_url
in self
.child_right
: child_right
= self
.child_right
[feed_url
]
103 me
= (self
.ip_addr
[feed_url
], self
.p2prfd_http_port
)
106 self
.ping_node(child_left
)
108 print 'left child %s error' % (repr(child_left
))
110 del self
.child_left
[feed_url
]
114 self
.ping_node(child_right
)
116 print 'right child %s error' % (repr(child_right
))
118 del self
.child_right
[feed_url
]
120 print 'check result:'
121 print 'left child is %s' % (repr(child_left
))
122 print 'right child is %s' % (repr(child_right
))
126 for feed_url
in self
.cache_data
:
127 print 'updating %s' % (feed_url
)
128 self
.update_feed(feed_url
)
129 self
.check_children(feed_url
)
130 twisted
.internet
.reactor
.callLater(self
.update_interval
, self
.update
)
131 #self.update_timer.reset(self.update_interval)
132 def get_data_from_parent(self
, parent
, key
):
133 #return 'url %s gotten from %s' % (key, repr(parent))
134 """get data from parent"""
135 print 'fetching from parent %s' % (repr(parent
))
136 http_con
= httplib
.HTTPConnection(parent
[0], parent
[1])
138 queries
['action'] = 'get_rss'
140 http_con
.request("GET", '/?' + urllib
.urlencode(queries
))
141 response
= http_con
.getresponse()
142 resp_str
= response
.read()
143 result
= eval(resp_str
)
145 return result
['data']
146 def get_data_from_server(self
, addr
):
147 """get a resource directly from addr"""
148 parse_result
= urlparse
.urlparse(addr
)
149 print 'fetching directly from %s' % (addr
)
150 http_con
= httplib
.HTTPConnection(parse_result
.netloc
)
151 path
= parse_result
.path
152 if parse_result
.query
:
153 path
+= '?' + parse_result
.query
154 http_con
.request("GET", path
)
155 response
= http_con
.getresponse()
156 data
= response
.read()
159 def get_parent_from_root(self
, root
, feed_url
):
160 """find a good parent from this root"""
161 http_con
= httplib
.HTTPConnection(root
[0], root
[1])
163 queries
['action'] = 'get_parent'
164 queries
['rss'] = feed_url
165 http_con
.request("GET", '/?' + urllib
.urlencode(queries
))
166 response
= http_con
.getresponse()
167 resp_str
= response
.read()
168 result
= eval(resp_str
)
169 parent
= result
['parent']
171 def insert_to_parent(self
, parent
, me
, feed_url
):
172 print 'insert_to_parent'
173 http_con
= httplib
.HTTPConnection(parent
[0], parent
[1])
175 queries
['action'] = 'insert_child'
176 queries
['rss'] = feed_url
177 queries
['node_id'] = self
.node_id
178 queries
['addr'] = repr(me
)
179 http_con
.request("GET", '/?' + urllib
.urlencode(queries
))
180 response
= http_con
.getresponse()
181 resp_str
= response
.read()
182 result
= eval(resp_str
)
183 return result
['status']
184 def render_GET(self
, request
):
185 """beginning of processing the request"""
186 """if I have the data in cache"""
187 if (request
.uri
in self
.cache_data
):
188 data
= self
.cache_data
[request
.uri
]
189 print 'url %s found in cache' % request
.uri
192 self
.insert_new_feed(request
.uri
, request
)
193 return twisted
.web
.server
.NOT_DONE_YET
194 class P2prfdResource(twisted
.web
.resource
.Resource
):
196 def get_node_info(self
, node
, feed_url
):
197 http_con
= httplib
.HTTPConnection(node
[0], node
[1])
199 queries
['action'] = 'get_node_info'
200 queries
['rss'] = feed_url
201 http_con
.request("GET", '/?' + urllib
.urlencode(queries
))
202 response
= http_con
.getresponse()
203 resp_str
= response
.read()
204 result
= eval(resp_str
)
205 return result
['info']
206 def render_GET(self
, request
):
207 parsed
= urlparse
.urlparse(request
.uri
)
208 queries
= cgi
.parse_qs(parsed
.query
)
209 print 'queries =', queries
211 if 'action' in queries
: action
= queries
['action'][0]
213 result
['status'] = False
214 result
['data'] = 'no data found'
215 if action
== 'get_rss':
216 feed_url
= queries
['rss'][0]
217 if feed_url
in self
.rsrc
.cache_data
:
218 result
['status'] = True
219 result
['data'] = self
.rsrc
.cache_data
[feed_url
]
220 elif action
== 'get_parent':
221 feed_url
= queries
['rss'][0]
224 if feed_url
in self
.rsrc
.child_left
: child_left
= self
.rsrc
.child_left
[feed_url
]
225 if feed_url
in self
.rsrc
.child_right
: child_right
= self
.rsrc
.child_right
[feed_url
]
226 me
= (self
.rsrc
.ip_addr
[feed_url
], self
.rsrc
.p2prfd_http_port
)
229 left_info
= self
.get_node_info(child_left
, feed_url
)
231 print 'left child %s error' % (repr(child_left
))
233 del self
.rsrc
.child_left
[feed_url
]
237 right_info
= self
.get_node_info(child_right
, feed_url
)
239 print 'right child %s error' % (repr(child_right
))
241 del self
.rsrc
.child_right
[feed_url
]
243 if child_left
and child_right
:
244 if left_info
['depth'] <= right_info
['depth']:
245 parent
= self
.rsrc
.get_parent_from_root(child_left
, feed_url
)
247 parent
= self
.rsrc
.get_parent_from_root(child_right
, feed_url
)
248 result
['status'] = True
249 result
['parent'] = parent
252 result
['status'] = True
253 result
['parent'] = parent
254 #result['parent'] = (self.rsrc.ip_addr[feed_url], self.rsrc.p2prfd_http_port)
255 print 'left child is %s' % (repr(child_left
))
256 print 'right child is %s' % (repr(child_right
))
258 elif action
== 'get_node_info':
259 feed_url
= queries
['rss'][0]
262 if feed_url
in self
.rsrc
.child_left
: child_left
= self
.rsrc
.child_left
[feed_url
]
263 if feed_url
in self
.rsrc
.child_right
: child_right
= self
.rsrc
.child_right
[feed_url
]
267 left_info
= self
.get_node_info(child_left
, feed_url
)
268 left_depth
= left_info
['depth']
271 right_info
= self
.get_node_info(child_right
, feed_url
)
272 right_depth
= right_info
['depth']
274 depth
= max(left_depth
, right_depth
) + 1
276 info
['depth'] = depth
277 print 'depth is %s' % (depth
)
278 result
['status'] = True
279 result
['info'] = info
280 elif action
== 'insert_child':
281 node_id
= queries
['node_id'][0]
282 if node_id
== self
.rsrc
.node_id
:
283 print 'error connecting, try a moment later'
287 feed_url
= queries
['rss'][0]
288 addr
= eval(queries
['addr'][0])
289 print 'inserting child %s' % (repr(addr
))
290 if not (feed_url
in self
.rsrc
.child_left
):
291 self
.rsrc
.child_left
[feed_url
] = addr
292 result
['status'] = True
293 elif not (feed_url
in self
.rsrc
.child_right
):
294 self
.rsrc
.child_right
[feed_url
] = addr
295 result
['status'] = True
297 result
['status'] = False
300 elif action
== 'ping':
301 result
['status'] = True
302 result
['node_id'] = self
.rsrc
.node_id
304 result
['status'] = False
307 if len(sys
.argv
) == 6:
308 known_node
= (sys
.argv
[4], int(sys
.argv
[5]))
311 rsrc
.dht_network
= entangled_network
.EntangledNetwork(int(sys
.argv
[1]), known_node
)
312 rsrc
.p2prfd_http_port
= int(sys
.argv
[3])
313 rsrc
.node_id
= rsrc
.generate_id()
314 site
= twisted
.web
.server
.Site(rsrc
)
315 p2prfd_rsrc
= P2prfdResource()
316 p2prfd_rsrc
.rsrc
= rsrc
317 p2prfd_site
= twisted
.web
.server
.Site(p2prfd_rsrc
)
318 update_timer
= twisted
.internet
.reactor
.callLater(rsrc
.update_interval
, rsrc
.update
)
319 rsrc
.update_timer
= update_timer
320 twisted
.internet
.reactor
.listenTCP(int(sys
.argv
[2]), site
)
321 twisted
.internet
.reactor
.listenTCP(rsrc
.p2prfd_http_port
, p2prfd_site
)
322 twisted
.internet
.reactor
.run()