The trunk can use the main server again (for the time being).
[switzerland.git] / switzerland / server / Reconciliator.py
blobbc2ee2899d09af464930612d6a2d120a7a6897ce
1 #!/usr/bin/env python
3 import sys
4 import logging
5 import time
6 import traceback
7 import socket as s
8 from threading import RLock
9 from binascii import hexlify
11 from switzerland.common import Protocol
12 from switzerland.common import util
13 from switzerland.common.Flow import print_flow_tuple
15 id_num = 0
16 id_lock = RLock()
17 TIMESTAMP = -1
19 drop_timeout = 30 # packet was dropped if not seen in this time
20 clock_safety_margin = 2.0
22 dangling_timeout = 12.0 # if this much time has elapsed since one side of a
23 # Reconciliator has shown up, the other side is probably
24 # never going to show up
26 mark_everything_forged = False # for testing fi/fo context messages easily
27 crazy_debugging = False
28 hash_archival = False
29 hash_event_archival = hash_archival and False
31 log = logging.getLogger('switzerland.reconciliator')
33 # for debugging madness
34 if hash_archival:
35 alice_ipids = {}
36 alice_hashes = {}
37 alice_flows_by_hash = {}
38 bob_ipids = {}
39 bob_hashes = {}
40 forged_history = []
41 bob_flows_by_hash = {}
42 events_by_hash = {}
44 class Dangling(Exception):
45 pass
47 def makebatch(timestamp, hashes, alice, rec):
48 """
49 A factory for the special dicts that represent our batches.
50 hashes is a string of concatenated hashes
51 TIMESTAMP is a constant that must not collide with the hashes
52 """
54 batch = {TIMESTAMP:timestamp}
55 pos = 0
56 for n in xrange(Protocol.hashes_in_batch(hashes)):
57 hash = hashes[pos : pos + Protocol.hash_length]
58 # debugging madness
59 hash, ipid = hash[:-2], hash[-2:]
60 if hash_archival:
61 if alice:
62 alice_ipids.setdefault(hash,[]).append(ipid)
63 alice_hashes.setdefault(ipid,[]).append(hash)
64 alice_flows_by_hash.setdefault(hash,[]).append(rec)
65 else:
66 bob_ipids.setdefault(hash,[]).append(ipid)
67 bob_hashes.setdefault(ipid,[]).append(hash)
68 bob_flows_by_hash.setdefault(hash,[]).append(rec)
70 batch.setdefault(hash, 0)
71 batch[hash] += 1
72 pos += Protocol.hash_length
73 return batch
76 class Reconciliator:
77 """ Compare what Alice says she sent to what Bob says he received.
78 Report forgeries and dropped packets. """
80 def __init__(self, flow, m_tuple):
81 self.lock = RLock()
82 self.m_tuple = m_tuple
83 self.newest_information_from_a = 0
84 self.newest_information_from_b = 0
85 # XXX there is some silly redudancy here, because we don't calculate
86 # which side (sending / receiving) is calling this constructor, so we
87 # don't know which view of the flow we have until add_link is called
88 self.flow=flow
89 self.src_flow = None
90 self.dest_flow = None
91 self.ready = False
92 self.respond_to_forgeries = True # used by Swtizerland.py
93 self.birthday = time.time()
95 # These two structures are lists of batches:
96 # we might have stored them like this:
97 # [(timestamp, {hash1:int count1, hash2: int count2}), (timestamp2,{..]
98 # but it's easier to delete by reference if we put the timestamp
99 # _inside_ the hash, like this:
101 # a_to_b : [{TIMESTAMP:timestamp, hash1:count1, hash2:count2}, ..]
102 self.a_to_b = []
103 self.b_from_a = []
104 # These are dicts for the whole queue, mapping each hash to the batches
105 # that contain at least one packet with that hash, something like:
106 # sent_packets {hash1:[a_to_b[1], a_to_b[7], a_to_b[33]}
107 self.sent_packets = {}
108 self.recd_packets = {}
110 self.okay_packets = 0
111 self.forged_packets = 0
112 self.dropped_packets = 0
113 self.finalized = False
114 self.src_links = []
115 self.dest_links = []
116 global id_num, id_lock
117 id_lock.acquire()
118 self.id = id_num
119 id_num += 1
120 id_lock.release()
121 if crazy_debugging:
122 import cPickle
123 archives = cPickle.load("archives.pickle")
126 def add_link(self, link, id, f_tuple):
127 "Figure out whether this link is alice or bob, and remember it"
128 if f_tuple == None:
129 log.error("add_link should not be called with f_tuple=None!!!!")
130 return False
131 self.lock.acquire()
132 try:
133 ip = s.inet_aton(link.peer[0])
135 if ip == self.m_tuple[0]:
136 self.src_links.append((link, id))
137 self.src_flow = f_tuple
138 if len(self.src_links) != 1:
139 link.debug_note("Duplicate src_links: %s" % `self.src_links`)
140 elif ip == self.m_tuple[1]:
141 self.dest_links.append((link, id))
142 self.dest_flow = f_tuple
143 if len(self.dest_links) != 1:
144 link.debug_note("Duplicate dest_links: %s" % `self.dest_links`)
145 else:
146 link.protocol_error("Argh, confused about links and reconciliators!\n")
148 if self.dest_links and self.src_links:
150 skew1 = max([l.get_clock_dispersion() for l,id in self.src_links])
151 skew2 = max([l.get_clock_dispersion() for l,id in self.dest_links])
152 self.max_clock_skew = (skew1 + skew2) + clock_safety_margin
153 if not (self.src_flow and self.dest_flow):
154 # This is debugging for "weird error"s
155 log.error("Was about to ready a one-sided flow!!!")
156 return False
157 self.ready = True
158 log.debug("We now have both sides of flow %s", print_flow_tuple(self.flow))
159 return True # have both sides
160 else:
161 log.debug("We currently only have one side of flow: %s", print_flow_tuple(self.flow))
162 return False
163 finally:
164 self.lock.release()
166 def leftovers(self):
167 "Return a pair of the number of unreconciled packets in this flow"
168 return (len(self.sent_packets), len(self.recd_packets))
170 def prettyprint(self):
172 Looks something like this:
174 CURRENT FLOW TABLE: okay drop mod/frg pend t/rx prot
175 111.222.233.244:12345 > 244.233.222.211:78901 343004 10000 100001 1000/2331 icmp
176 (192.168.1.100:54343) (192.168.33.212:2333) opening_hash:
179 pub_src,pub_dest = map(s.inet_ntoa, self.m_tuple[0:2])
180 o_hash = self.m_tuple[2]
181 # the other side is the only reliable indicator of each side's
182 # public port number
183 try:
184 pub_src += ":" + `util.bin2int(self.dest_flow[1])`
185 pub_dest += ":" + `util.bin2int(self.src_flow[3])`
186 except:
187 log.error("Weird error caused by flow %s (%s, %s)" % (`self.flow`, \
188 `self.dest_flow`, `self.src_flow`))
189 log.error(traceback.format_exc())
191 line1 = pub_src.center(21) + " > " + pub_dest.center(21)
193 # 19 = len("192.168.1.100:65535") -- this should be okay unless
194 # the private addresses & ports are rather unusual
195 priv_src = priv_dest = "not firewalled".center(19)
196 if self.src_links[0][0].alice_firewalled:
197 priv_src = self.src_links[0][0].peers_private_ip
198 priv_src += ":" + `util.bin2int(self.src_flow[1])`
199 if self.dest_links[0][0].alice_firewalled: # here, alice means bob :)
200 priv_dest = self.dest_links[0][0].peers_private_ip
201 priv_dest += ":" + `util.bin2int(self.dest_flow[3])`
203 line2 = "(%19s) (%19s)" % (priv_src, priv_dest)
205 line1+= " %6g %5g %6g %4g/%4g " % (self.okay_packets, self.dropped_packets,\
206 self.forged_packets, len(self.sent_packets), len(self.recd_packets))
207 line1 += util.prot_name(util.bin2int(self.flow[4]))
208 return line1 + "\n" + line2
210 __str__ = prettyprint
213 def final_judgement(self):
214 """ flag newest information from alice and bob at infinity
215 to be used in testcases to flag all remaining packets """
216 self.lock.acquire()
217 try:
218 forged= self.alice_sent_flow_status(1e308)
219 dropped = self.bob_sent_flow_status(1e308)
220 self.finalized = True
221 finally:
222 self.lock.release()
223 if forged:
224 log.debug("Forged in judgement %s", `forged`)
225 if dropped:
226 log.debug("Dropped in judgement %s", `dropped`)
227 return (forged, dropped)
229 def alice_sent_flow_status(self, timestamp):
230 """
231 called when alice reports status for a flow (e.g. that it was idle)
232 this way we can know that alice didn't send a packet that bob received
233 even if alice doesn't send more packets afterwards
235 self.lock.acquire()
236 assert not self.finalized, 'not expecting finalized'
237 try:
238 try:
239 assert timestamp >= self.newest_information_from_a, 'expecting timestamp to be monotonically increasing, %f < %f' % (timestamp, self.newest_information_from_a)
240 except:
241 self.monotonicity_error()
242 self.newest_information_from_a = timestamp
243 forged = self.__check_for_forgeries()
244 finally:
245 self.lock.release()
246 return forged
248 def bob_sent_flow_status(self, timestamp):
249 """ called when bob reports status for a flow (e.g. that it was idle) """
250 self.lock.acquire()
251 assert not self.finalized, 'not expecting finalized'
252 try:
253 try:
254 assert timestamp >= self.newest_information_from_b, 'expecting timestamp to be monotonically increasing %f < %f' % (timestamp, self.newest_information_from_b)
255 except:
256 self.monotonicity_error()
257 self.newest_information_from_b = timestamp
258 dropped = self.check_for_drops()
259 finally:
260 self.lock.release()
261 return dropped
263 def sent_by_alice(self, timestamp, hashes):
264 "Called by Switzerland.py as new sent packets are reported by Alice."
265 self.lock.acquire()
266 try:
267 self.check_dangling()
268 assert not self.finalized, 'packets arriving in finalized reconciliator'
269 try:
270 assert timestamp >= self.newest_information_from_a, \
271 'expecting timestamp to be monotonically increasing %f < %f' % \
272 (timestamp, self.newest_information_from_a)
273 except:
274 self.monotonicity_error()
275 self.newest_information_from_a = timestamp
277 batch = makebatch(timestamp,hashes,True,self)
279 if hash_archival:
280 for hash in batch:
281 if hash in forged_history:
282 # XXX This started out as a debugging sanity check, but now
283 # perhaps this opens us to a DOS attack? How else can we convey
284 # the seriousness of this condition? We should send error
285 # messages to alice and bob; alice should probably be disconnected
286 # because she's possibly a culprit...
287 log.error("TIMING ERROR, sent packets arriving unacceptably late")
288 sys.exit(1)
290 self.__discard_from_new_batch(batch, self.recd_packets, self.b_from_a)
291 # Are there any packets left in the batch?
292 if len(batch) > 1: # 0 or more packets -- TIMESTAMP takes 1 slot
293 self.a_to_b.append(batch)
294 for hash in batch:
295 if hash != TIMESTAMP:
296 self.sent_packets.setdefault(hash,[]).append(batch)
297 forged = self.__check_for_forgeries()
298 finally:
299 self.lock.release()
300 return forged
302 def recd_by_bob(self, timestamp, hashes):
303 "Very similar to sent_by_alice, but confusing if it's factorised."
304 self.lock.acquire()
305 try:
306 self.check_dangling()
307 assert not self.finalized, 'not expecting finalized'
308 try:
309 assert timestamp >= self.newest_information_from_b, \
310 'expecting timestamp to be monotonically increasing %f < %f' % \
311 (timestamp, self.newest_information_from_b)
312 except:
313 self.monotonicity_error()
314 self.newest_information_from_b = timestamp
316 batch = makebatch(timestamp,hashes,False,self)
317 self.__discard_from_new_batch(batch, self.sent_packets, self.a_to_b)
318 # Are there any packets left in the batch?
319 if len(batch) > 1: # TIMESTAMP still takes a slot
320 self.b_from_a.append(batch)
321 for hash in batch:
322 if hash != TIMESTAMP:
323 self.recd_packets.setdefault(hash,[]).append(batch)
324 forged = self.__check_for_forgeries()
325 # XXX check for drops?
326 finally:
327 self.lock.release()
328 return forged
330 def monotonicity_error(self):
332 PacketListener.py in the client sometimes observes small amounts of
333 monotonicity, but the arrival of batches to the server which are
334 out-of-order is a much more serious condition. It can in theory happen
335 just because of very bad luck on top of the small monotonicities, but
336 should be very rare.
338 (Question: batches have a whole-batch timestamp; is that the first or last
339 packet in the batch?)
341 if self.ready:
342 log.error("Flow causing monotonicity error:\n" + self.prettyprint())
343 else:
344 log.error("Monotonicity error in unready flow %r (%r, %r)" % (self.flow, \
345 self.dest_flow, self.src_flow))
346 log.error("Innards of reconciliator:\n%r\n%r\n%r\n%r" % (self.a_to_b, \
347 self.b_from_a, self.sent_packets, self.recd_packets))
348 raise
351 def check_dangling(self):
353 A reconciliator is said to be dangling if for some reason it Alice and Bob
354 never get matched (ie self.ready is never True). Typical causes might be
355 modifications to the opening packet that change opening_hash; Alice and Bob
356 seeing different packets as the first packet in the flow (most likely if
357 the flow is older than their circle membership), or a flow between Alice
358 and someone other than Bob behind Bob's firewall. Raise an Exception if
359 we're deemed to be dangling.
361 if self.ready:
362 return
363 if time.time() > self.birthday + dangling_timeout:
364 raise Dangling
367 def __discard_from_new_batch(self, new_batch, other_dict, other_batches):
368 "We have a new batch, now remove everything in it that matches."
369 for hash, num in new_batch.items():
370 if hash == TIMESTAMP or mark_everything_forged:
371 continue
372 while hash in other_dict:
373 # the hash matches on the other side; discard it
374 new_batch[hash] -= 1
375 self.okay_packets += 1
376 if hash_event_archival:
377 event = "Discarded on receipt"
378 events_by_hash.setdefault(hash,[]).append(event)
379 # cancel with the oldest instance on the other side
380 other_batch = other_dict[hash][0]
381 other_batch[hash] -= 1
382 # the other side probably only had one of this hash in that batch:
383 if other_batch[hash] == 0:
384 if hash_event_archival:
385 event = "No more in batch"
386 events_by_hash.setdefault(hash,[]).append(event)
387 # so remove it:
388 del other_batch[hash]
389 # and remove that batch from their list of batches w/ this hash:
390 del other_dict[hash][0]
391 # and if that's now empty, they no longer have this hash at all:
392 if other_dict[hash] == []:
393 del other_dict[hash]
394 if hash_event_archival:
395 event = "Discard emptied other dict"
396 events_by_hash.setdefault(hash,[]).append(event)
398 if new_batch[hash] == 0:
399 del new_batch[hash]
400 if hash_event_archival:
401 event = "Emptied on this side"
402 events_by_hash.setdefault(hash,[]).append(event)
403 #no copies of the hash left on our side, so we can't cancel further
404 break
406 def __check_for_forgeries(self):
407 """
408 a packet is a forgery if bob got it and we know alice didn't send it.
409 we know alice didn't send a packet if
410 - it isn't in sent_packets, and
411 - alice has sent newer packets (or given a newer report of no activity).
412 note: assuming clocks are synchronized to within max_clock_skew seconds
413 note: bob can't receive a packet before alice sends it.
414 """
415 if not self.ready:
416 return []
417 antideadline = self.newest_information_from_a - self.max_clock_skew
418 forgeries = self.scan_batches(
419 self.b_from_a, self.recd_packets, self.sent_packets,
420 lambda b: b[TIMESTAMP] < antideadline
422 self.forged_packets += len(forgeries)
423 if hash_archival:
424 self.trace_forgery_event(forgeries)
425 return forgeries
427 def diagnose(self, dict):
428 str = "Dict of length %d " % len(dict)
429 entries = [len(batch) for batch in dict.values()]
430 zeroes = len([e for e in entries if e == 0])
431 ones = len([e for e in entries if e == 1])
432 others = len([e for e in entries if e > 1])
433 str += "%d 0s, %d 1s, %d 1+s" % (zeroes, ones, others)
434 return str
436 def trace_forgery_event(self, forgeries):
437 if forgeries:
438 for f in forgeries:
439 assert len(f[1]) == Protocol.hash_length -2 # XXX transient
440 forged_history.append(f[1])
442 # debugging madness
443 f = forgeries[0]
444 b_hash = f[1]
445 print hexlify(b_hash), "is a forgery"
446 if hash_archival:
447 ipids = bob_ipids[b_hash]
448 print "IPIDs that match this forgery are:", ipids
449 for ipid in ipids:
450 a_hashes = alice_hashes.setdefault(ipid,[])
451 print " ", hexlify(ipid), "matches", map(hexlify,a_hashes),"from alice"
452 for hash in a_hashes:
453 a_ipids = alice_ipids[hash]
454 print " ", hexlify(hash), "matches", map(hexlify,a_ipids)
455 if ipid in a_ipids:
456 print " (which is crazy!)"
457 print " Alice flows", alice_flows_by_hash.setdefault(hash,[])
458 print " Bob flows", bob_flows_by_hash.setdefault(hash,[])
459 if hash_event_archival:
460 print " History is", events_by_hash.setdefault(hash,[])
463 def scan_batches(self, batches, dict, other_dict, condition):
465 Proceed through the list of batches in chronological order.
466 For those old enough to match "condition", remove them from our dict.
467 We know they should not match the other side's dict.
468 ("side" = sent | received)
470 results = []
471 pos = 0
472 for batch in batches:
473 if not condition(batch):
474 break
475 pos += 1
476 for hash, num in batch.items():
477 if hash != TIMESTAMP:
478 # invariant 1:
479 assert hash in dict, "hash %s is not in dict %s!" % (hash,self.diagnose(dict))
480 # __discard_from_new_batch should ensure this:
481 assert (hash not in other_dict) or mark_everything_forged
482 # invariant 2:
483 list_of_occurences = dict[hash]
484 ptr = list_of_occurences.pop(0)
485 assert ptr == batch
486 if list_of_occurences == []:
487 del dict[hash]
488 if hash_event_archival:
489 event = "Deleted in scan_batch"
490 events_by_hash.setdefault(hash,[]).append(event)
491 for i in xrange(num):
492 results.append((batch[TIMESTAMP], hash))
493 del batches[0:pos]
494 return results
497 def check_for_drops(self):
498 """ a packet is dropped if alice sent it and we know bob didn't get it.
499 we know bob didn't get a packet if it's been more than drop_timeout
500 seconds since alice reported sending it """
501 assert not self.finalized, 'not expecting finalized'
502 self.lock.acquire()
503 try:
504 if not self.ready:
505 return []
506 dropped = self.scan_batches(
507 self.a_to_b, self.sent_packets,
508 self.recd_packets,
509 lambda b: self.newest_information_from_b - b[TIMESTAMP] > drop_timeout)
510 finally:
511 self.lock.release()
513 self.dropped_packets += len(dropped)
514 return dropped
516 # vim: et ts=2