1 """P2PRFD HTTP proxy"""
11 import twisted
.internet
.reactor
12 import twisted
.web
.resource
13 import twisted
.web
.server
15 import entangled_network
17 class P2prfdProxy(twisted
.web
.resource
.Resource
):
18 """HTTP request handler for P2PRFD proxy"""
26 def insert_new_feed(self
, feed_url
, request
=None):
27 print 'inserting new feed %s' % (feed_url
)
28 def root_gotten(root
):
33 parent
= self
.get_parent_from_root(root
, feed_url
)
34 me
= (self
.ip_addr
[feed_url
], self
.p2prfd_http_port
)
35 self
.insert_to_parent(parent
, me
, feed_url
)
36 self
.parent_of
[feed_url
] = parent
37 data
= self
.get_data_from_parent(parent
, feed_url
)
39 data
= self
.get_data_from_server(feed_url
)
40 dht_network
.set_value(root_key
, repr((self
.ip_addr
[feed_url
], self
.p2prfd_http_port
)))
42 data
= self
.get_data_from_server(feed_url
)
44 """tell the DHT that I am the root of this feed"""
45 dht_network
.set_value(root_key
, repr((self
.ip_addr
[feed_url
], self
.p2prfd_http_port
)))
47 data
= 'should not contain this'
53 self
.cache_data
[feed_url
] = data
54 parsed
= urlparse
.urlparse(feed_url
)
57 if parsed
.port
: http_port
= parsed
.port
58 sock
= socket
.socket()
59 sock
.connect((host
, http_port
))
60 ipaddr
= sock
.getsockname()[0]
61 self
.ip_addr
[feed_url
] = ipaddr
63 dht_network
= self
.dht_network
64 root_key
= 'root:' + feed_url
65 dht_network
.get_value(root_key
, root_gotten
)
66 def update_feed(self
, feed_url
):
67 if feed_url
in self
.parent_of
:
68 parent
= self
.parent_of
[feed_url
]
70 data
= self
.get_data_from_parent(parent
, feed_url
)
71 self
.cache_data
[feed_url
] = data
73 print 'parent %s error' % (repr(parent
))
74 self
.cache_data
[feed_url
] = 'parent error'
75 del self
.parent_of
[feed_url
]
76 self
.insert_new_feed(feed_url
)
78 data
= self
.get_data_from_server(feed_url
)
79 self
.cache_data
[feed_url
] = data
83 for feed_url
in self
.cache_data
:
84 print 'updating %s' % (feed_url
)
85 self
.update_feed(feed_url
)
86 twisted
.internet
.reactor
.callLater(self
.update_interval
, self
.update
)
87 #self.update_timer.reset(self.update_interval)
88 def get_data_from_parent(self
, parent
, key
):
89 #return 'url %s gotten from %s' % (key, repr(parent))
90 """get data from parent"""
91 print 'fetching from parent %s' % (repr(parent
))
92 http_con
= httplib
.HTTPConnection(parent
[0], parent
[1])
94 queries
['action'] = 'get_rss'
96 http_con
.request("GET", '/?' + urllib
.urlencode(queries
))
97 response
= http_con
.getresponse()
98 resp_str
= response
.read()
99 result
= eval(resp_str
)
101 return result
['data']
102 def get_data_from_server(self
, addr
):
103 """get a resource directly from addr"""
104 parse_result
= urlparse
.urlparse(addr
)
105 print 'fetching directly from %s' % (addr
)
106 http_con
= httplib
.HTTPConnection(parse_result
.netloc
)
107 path
= parse_result
.path
108 if parse_result
.query
:
109 path
+= '?' + parse_result
.query
110 http_con
.request("GET", path
)
111 response
= http_con
.getresponse()
112 data
= response
.read()
115 def get_parent_from_root(self
, root
, feed_url
):
116 """find a good parent from this root"""
117 http_con
= httplib
.HTTPConnection(root
[0], root
[1])
119 queries
['action'] = 'get_parent'
120 queries
['rss'] = feed_url
121 http_con
.request("GET", '/?' + urllib
.urlencode(queries
))
122 response
= http_con
.getresponse()
123 resp_str
= response
.read()
124 result
= eval(resp_str
)
125 parent
= result
['parent']
127 def insert_to_parent(self
, parent
, me
, feed_url
):
128 http_con
= httplib
.HTTPConnection(parent
[0], parent
[1])
130 queries
['action'] = 'insert_child'
131 queries
['rss'] = feed_url
132 queries
['addr'] = repr(me
)
133 http_con
.request("GET", '/?' + urllib
.urlencode(queries
))
134 response
= http_con
.getresponse()
135 resp_str
= response
.read()
136 result
= eval(resp_str
)
137 return result
['status']
138 def render_GET(self
, request
):
139 """beginning of processing the request"""
140 """if I have the data in cache"""
141 if (request
.uri
in self
.cache_data
):
142 data
= self
.cache_data
[request
.uri
]
143 print 'url %s found in cache' % request
.uri
146 self
.insert_new_feed(request
.uri
, request
)
147 return twisted
.web
.server
.NOT_DONE_YET
148 class P2prfdResource(twisted
.web
.resource
.Resource
):
150 def get_node_info(self
, node
, feed_url
):
151 http_con
= httplib
.HTTPConnection(node
[0], node
[1])
153 queries
['action'] = 'get_node_info'
154 queries
['rss'] = feed_url
155 http_con
.request("GET", '/?' + urllib
.urlencode(queries
))
156 response
= http_con
.getresponse()
157 resp_str
= response
.read()
158 result
= eval(resp_str
)
159 return result
['info']
160 def render_GET(self
, request
):
161 parsed
= urlparse
.urlparse(request
.uri
)
162 queries
= cgi
.parse_qs(parsed
.query
)
163 action
= queries
['action'][0]
165 result
['status'] = False
166 result
['data'] = 'no data found'
167 print 'action is %s' % (action
)
168 if action
== 'get_rss':
169 feed_url
= queries
['rss'][0]
170 if feed_url
in self
.rsrc
.cache_data
:
171 result
['status'] = True
172 result
['data'] = self
.rsrc
.cache_data
[feed_url
]
173 elif action
== 'get_parent':
174 feed_url
= queries
['rss'][0]
177 if feed_url
in self
.rsrc
.child_left
: child_left
= self
.rsrc
.child_left
[feed_url
]
178 if feed_url
in self
.rsrc
.child_right
: child_right
= self
.rsrc
.child_right
[feed_url
]
179 me
= (self
.rsrc
.ip_addr
[feed_url
], self
.rsrc
.p2prfd_http_port
)
182 left_info
= self
.get_node_info(child_left
, feed_url
)
184 print 'left child %s error' % (repr(child_left
))
186 del self
.rsrc
.child_left
[feed_url
]
190 right_info
= self
.get_node_info(child_right
, feed_url
)
192 print 'right child %s error' % (repr(child_right
))
194 del self
.rsrc
.child_right
[feed_url
]
196 if child_left
and child_right
:
197 if left_info
['depth'] <= right_info
['depth']:
198 parent
= self
.rsrc
.get_parent_from_root(child_left
, feed_url
)
200 parent
= self
.rsrc
.get_parent_from_root(child_right
, feed_url
)
201 result
['status'] = True
202 result
['parent'] = parent
205 result
['status'] = True
206 result
['parent'] = parent
207 #result['parent'] = (self.rsrc.ip_addr[feed_url], self.rsrc.p2prfd_http_port)
208 print 'left child is %s' % (repr(child_left
))
209 print 'right child is %s' % (repr(child_right
))
211 elif action
== 'get_node_info':
212 feed_url
= queries
['rss'][0]
215 if feed_url
in self
.rsrc
.child_left
: child_left
= self
.rsrc
.child_left
[feed_url
]
216 if feed_url
in self
.rsrc
.child_right
: child_right
= self
.rsrc
.child_right
[feed_url
]
220 left_info
= self
.get_node_info(child_left
, feed_url
)
221 left_depth
= left_info
['depth']
224 right_info
= self
.get_node_info(child_right
, feed_url
)
225 right_depth
= right_info
['depth']
227 depth
= max(left_depth
, right_depth
) + 1
229 info
['depth'] = depth
230 print 'depth is %s' % (depth
)
231 result
['status'] = True
232 result
['info'] = info
233 elif action
== 'insert_child':
234 feed_url
= queries
['rss'][0]
235 addr
= eval(queries
['addr'][0])
236 print 'inserting child %s' % (repr(addr
))
237 if not (feed_url
in self
.rsrc
.child_left
):
238 self
.rsrc
.child_left
[feed_url
] = addr
239 result
['status'] = True
240 elif not (feed_url
in self
.rsrc
.child_right
):
241 self
.rsrc
.child_right
[feed_url
] = addr
242 result
['status'] = True
244 result
['status'] = False
246 result
['status'] = False
249 if len(sys
.argv
) == 6:
250 known_node
= (sys
.argv
[4], int(sys
.argv
[5]))
253 rsrc
.dht_network
= entangled_network
.EntangledNetwork(int(sys
.argv
[1]), known_node
)
254 rsrc
.p2prfd_http_port
= int(sys
.argv
[3])
255 site
= twisted
.web
.server
.Site(rsrc
)
256 p2prfd_rsrc
= P2prfdResource()
257 p2prfd_rsrc
.rsrc
= rsrc
258 p2prfd_site
= twisted
.web
.server
.Site(p2prfd_rsrc
)
259 update_timer
= twisted
.internet
.reactor
.callLater(rsrc
.update_interval
, rsrc
.update
)
260 rsrc
.update_timer
= update_timer
261 twisted
.internet
.reactor
.callLater(2, rsrc
.dht_network
.delete_key
, 'root:http://localhost/coba.txt')
262 twisted
.internet
.reactor
.listenTCP(int(sys
.argv
[2]), site
)
263 twisted
.internet
.reactor
.listenTCP(rsrc
.p2prfd_http_port
, p2prfd_site
)
264 twisted
.internet
.reactor
.run()