added delete key
[p2prfd.git] / p2prfd.py
blob6ae9187792c77bb5e3d919cdd1b8980df89e4ec1
1 """P2PRFD HTTP proxy"""
3 import cgi
4 import hashlib
5 import httplib
6 import random
7 import socket
8 import sys
9 import threading
10 import urllib
11 import urlparse
13 import twisted.internet.reactor
14 import twisted.web.resource
15 import twisted.web.server
17 import entangled_network
19 class P2prfdProxy(twisted.web.resource.Resource):
20 """HTTP request handler for P2PRFD proxy"""
21 isLeaf = True
22 cache_data = {}
23 parent_of = {}
24 ip_addr = {}
25 child_left = {}
26 child_right = {}
27 update_interval = 10
28 def generate_id(self):
29 sh = hashlib.sha1()
30 sh.update(str(random.random()))
31 id = sh.hexdigest()
32 return id
33 def insert_new_feed(self, feed_url, request=None):
34 print 'inserting new feed %s' % (feed_url)
35 def root_gotten(root):
36 data = ''
37 if type(root) is str:
38 root = eval(root)
39 try:
40 print 'getting parent'
41 parent = self.get_parent_from_root(root, feed_url)
42 print 'parent =', repr(parent)
43 me = (self.ip_addr[feed_url], self.p2prfd_http_port)
44 self.insert_to_parent(parent, me, feed_url)
45 self.parent_of[feed_url] = parent
46 data = self.get_data_from_parent(parent, feed_url)
47 except:
48 data = self.get_data_from_server(feed_url)
49 dht_network.set_value(root_key, repr((self.ip_addr[feed_url], self.p2prfd_http_port)))
50 elif root is None:
51 data = self.get_data_from_server(feed_url)
53 """tell the DHT that I am the root of this feed"""
54 dht_network.set_value(root_key, repr((self.ip_addr[feed_url], self.p2prfd_http_port)))
55 else:
56 data = 'should not contain this'
57 pass
58 if request:
59 request.write(data)
60 request.finish()
61 """cache the data"""
62 self.cache_data[feed_url] = data
63 parsed = urlparse.urlparse(feed_url)
64 host = parsed.netloc
65 http_port = 80
66 if parsed.port: http_port = parsed.port
67 sock = socket.socket()
68 sock.connect((host, http_port))
69 ipaddr = sock.getsockname()[0]
70 self.ip_addr[feed_url] = ipaddr
72 dht_network = self.dht_network
73 root_key = 'root:' + feed_url
74 dht_network.get_value(root_key, root_gotten)
75 def update_feed(self, feed_url):
76 if feed_url in self.parent_of:
77 parent = self.parent_of[feed_url]
78 try:
79 data = self.get_data_from_parent(parent, feed_url)
80 self.cache_data[feed_url] = data
81 except:
82 print 'parent %s error' % (repr(parent))
83 self.cache_data[feed_url] = 'parent error'
84 del self.parent_of[feed_url]
85 self.insert_new_feed(feed_url)
86 else:
87 data = self.get_data_from_server(feed_url)
88 self.cache_data[feed_url] = data
89 pass
90 pass
91 def ping_node(self, node):
92 http_con = httplib.HTTPConnection(node[0], node[1])
93 queries = {}
94 queries['action'] = 'ping'
95 http_con.request("GET", '/?' + urllib.urlencode(queries))
96 http_con.close()
97 def check_children(self, feed_url):
98 print 'checking children'
99 child_left = None
100 child_right = None
101 if feed_url in self.child_left: child_left = self.child_left[feed_url]
102 if feed_url in self.child_right: child_right = self.child_right[feed_url]
103 me = (self.ip_addr[feed_url], self.p2prfd_http_port)
104 if child_left:
105 try:
106 self.ping_node(child_left)
107 except:
108 print 'left child %s error' % (repr(child_left))
109 child_left = None
110 del self.child_left[feed_url]
111 pass
112 if child_right:
113 try:
114 self.ping_node(child_right)
115 except:
116 print 'right child %s error' % (repr(child_right))
117 child_right = None
118 del self.child_right[feed_url]
119 pass
120 print 'check result:'
121 print 'left child is %s' % (repr(child_left))
122 print 'right child is %s' % (repr(child_right))
123 pass
124 def update(self):
125 print 'updating'
126 for feed_url in self.cache_data:
127 print 'updating %s' % (feed_url)
128 self.update_feed(feed_url)
129 self.check_children(feed_url)
130 twisted.internet.reactor.callLater(self.update_interval, self.update)
131 #self.update_timer.reset(self.update_interval)
132 def get_data_from_parent(self, parent, key):
133 #return 'url %s gotten from %s' % (key, repr(parent))
134 """get data from parent"""
135 print 'fetching from parent %s' % (repr(parent))
136 http_con = httplib.HTTPConnection(parent[0], parent[1])
137 queries = {}
138 queries['action'] = 'get_rss'
139 queries['rss'] = key
140 http_con.request("GET", '/?' + urllib.urlencode(queries))
141 response = http_con.getresponse()
142 resp_str = response.read()
143 result = eval(resp_str)
144 http_con.close()
145 return result['data']
146 def get_data_from_server(self, addr):
147 """get a resource directly from addr"""
148 parse_result = urlparse.urlparse(addr)
149 print 'fetching directly from %s' % (addr)
150 http_con = httplib.HTTPConnection(parse_result.netloc)
151 path = parse_result.path
152 if parse_result.query:
153 path += '?' + parse_result.query
154 http_con.request("GET", path)
155 response = http_con.getresponse()
156 data = response.read()
157 http_con.close()
158 return data
159 def get_parent_from_root(self, root, feed_url):
160 """find a good parent from this root"""
161 http_con = httplib.HTTPConnection(root[0], root[1])
162 queries = {}
163 queries['action'] = 'get_parent'
164 queries['rss'] = feed_url
165 http_con.request("GET", '/?' + urllib.urlencode(queries))
166 response = http_con.getresponse()
167 resp_str = response.read()
168 result = eval(resp_str)
169 parent = result['parent']
170 return parent
171 def insert_to_parent(self, parent, me, feed_url):
172 print 'insert_to_parent'
173 http_con = httplib.HTTPConnection(parent[0], parent[1])
174 queries = {}
175 queries['action'] = 'insert_child'
176 queries['rss'] = feed_url
177 queries['node_id'] = self.node_id
178 queries['addr'] = repr(me)
179 http_con.request("GET", '/?' + urllib.urlencode(queries))
180 response = http_con.getresponse()
181 resp_str = response.read()
182 result = eval(resp_str)
183 return result['status']
184 def render_GET(self, request):
185 """beginning of processing the request"""
186 """if I have the data in cache"""
187 if (request.uri in self.cache_data):
188 data = self.cache_data[request.uri]
189 print 'url %s found in cache' % request.uri
190 return data
191 else:
192 self.insert_new_feed(request.uri, request)
193 return twisted.web.server.NOT_DONE_YET
194 class P2prfdResource(twisted.web.resource.Resource):
195 isLeaf = True
196 def get_node_info(self, node, feed_url):
197 http_con = httplib.HTTPConnection(node[0], node[1])
198 queries = {}
199 queries['action'] = 'get_node_info'
200 queries['rss'] = feed_url
201 http_con.request("GET", '/?' + urllib.urlencode(queries))
202 response = http_con.getresponse()
203 resp_str = response.read()
204 result = eval(resp_str)
205 return result['info']
206 def render_GET(self, request):
207 parsed = urlparse.urlparse(request.uri)
208 queries = cgi.parse_qs(parsed.query)
209 print 'queries =', queries
210 action = ''
211 if 'action' in queries: action = queries['action'][0]
212 result = {}
213 result['status'] = False
214 result['data'] = 'no data found'
215 if action == 'get_rss':
216 feed_url = queries['rss'][0]
217 if feed_url in self.rsrc.cache_data:
218 result['status'] = True
219 result['data'] = self.rsrc.cache_data[feed_url]
220 elif action == 'get_parent':
221 feed_url = queries['rss'][0]
222 child_left = None
223 child_right = None
224 if feed_url in self.rsrc.child_left: child_left = self.rsrc.child_left[feed_url]
225 if feed_url in self.rsrc.child_right: child_right = self.rsrc.child_right[feed_url]
226 me = (self.rsrc.ip_addr[feed_url], self.rsrc.p2prfd_http_port)
227 if child_left:
228 try:
229 left_info = self.get_node_info(child_left, feed_url)
230 except:
231 print 'left child %s error' % (repr(child_left))
232 child_left = None
233 del self.rsrc.child_left[feed_url]
234 pass
235 if child_right:
236 try:
237 right_info = self.get_node_info(child_right, feed_url)
238 except:
239 print 'right child %s error' % (repr(child_right))
240 child_right = None
241 del self.rsrc.child_right[feed_url]
242 pass
243 if child_left and child_right:
244 if left_info['depth'] <= right_info['depth']:
245 parent = self.rsrc.get_parent_from_root(child_left, feed_url)
246 else:
247 parent = self.rsrc.get_parent_from_root(child_right, feed_url)
248 result['status'] = True
249 result['parent'] = parent
250 else:
251 parent = me
252 result['status'] = True
253 result['parent'] = parent
254 #result['parent'] = (self.rsrc.ip_addr[feed_url], self.rsrc.p2prfd_http_port)
255 print 'left child is %s' % (repr(child_left))
256 print 'right child is %s' % (repr(child_right))
257 pass
258 elif action == 'get_node_info':
259 feed_url = queries['rss'][0]
260 child_left = None
261 child_right = None
262 if feed_url in self.rsrc.child_left: child_left = self.rsrc.child_left[feed_url]
263 if feed_url in self.rsrc.child_right: child_right = self.rsrc.child_right[feed_url]
264 left_depth = 0
265 right_depth = 0
266 if child_left:
267 left_info = self.get_node_info(child_left, feed_url)
268 left_depth = left_info['depth']
269 pass
270 if child_right:
271 right_info = self.get_node_info(child_right, feed_url)
272 right_depth = right_info['depth']
273 pass
274 depth = max(left_depth, right_depth) + 1
275 info = {}
276 info['depth'] = depth
277 print 'depth is %s' % (depth)
278 result['status'] = True
279 result['info'] = info
280 elif action == 'insert_child':
281 node_id = queries['node_id'][0]
282 if node_id == self.rsrc.node_id:
283 print 'error connecting, try a moment later'
284 sys.exit()
285 pass
286 else:
287 feed_url = queries['rss'][0]
288 addr = eval(queries['addr'][0])
289 print 'inserting child %s' % (repr(addr))
290 if not (feed_url in self.rsrc.child_left):
291 self.rsrc.child_left[feed_url] = addr
292 result['status'] = True
293 elif not (feed_url in self.rsrc.child_right):
294 self.rsrc.child_right[feed_url] = addr
295 result['status'] = True
296 else:
297 result['status'] = False
298 pass
299 pass
300 elif action == 'ping':
301 result['status'] = True
302 result['node_id'] = self.rsrc.node_id
303 else:
304 result['status'] = False
305 return repr(result)
306 rsrc = P2prfdProxy()
307 if len(sys.argv) == 6:
308 known_node = (sys.argv[4], int(sys.argv[5]))
309 else:
310 known_node = None
311 rsrc.dht_network = entangled_network.EntangledNetwork(int(sys.argv[1]), known_node)
312 rsrc.p2prfd_http_port = int(sys.argv[3])
313 rsrc.node_id = rsrc.generate_id()
314 site = twisted.web.server.Site(rsrc)
315 p2prfd_rsrc = P2prfdResource()
316 p2prfd_rsrc.rsrc = rsrc
317 p2prfd_site = twisted.web.server.Site(p2prfd_rsrc)
318 update_timer = twisted.internet.reactor.callLater(rsrc.update_interval, rsrc.update)
319 rsrc.update_timer = update_timer
320 twisted.internet.reactor.listenTCP(int(sys.argv[2]), site)
321 twisted.internet.reactor.listenTCP(rsrc.p2prfd_http_port, p2prfd_site)
322 twisted.internet.reactor.run()