8 from threading
import RLock
9 from binascii
import hexlify
11 from switzerland
.common
import Protocol
12 from switzerland
.common
import util
13 from switzerland
.common
.Flow
import print_flow_tuple
19 drop_timeout
= 30 # packet was dropped if not seen in this time
20 clock_safety_margin
= 2.0
22 dangling_timeout
= 12.0 # if this much time has elapsed since one side of a
23 # Reconciliator has shown up, the other side is probably
24 # never going to show up
26 mark_everything_forged
= False # for testing fi/fo context messages easily
27 crazy_debugging
= False
29 hash_event_archival
= hash_archival
and False
31 log
= logging
.getLogger('switzerland.reconciliator')
33 # for debugging madness
37 alice_flows_by_hash
= {}
41 bob_flows_by_hash
= {}
44 class Dangling(Exception):
47 def makebatch(timestamp
, hashes
, alice
, rec
):
49 A factory for the special dicts that represent our batches.
50 hashes is a string of concatenated hashes
51 TIMESTAMP is a constant that must not collide with the hashes
54 batch
= {TIMESTAMP
:timestamp
}
56 for n
in xrange(Protocol
.hashes_in_batch(hashes
)):
57 hash = hashes
[pos
: pos
+ Protocol
.hash_length
]
59 hash, ipid
= hash[:-2], hash[-2:]
62 alice_ipids
.setdefault(hash,[]).append(ipid
)
63 alice_hashes
.setdefault(ipid
,[]).append(hash)
64 alice_flows_by_hash
.setdefault(hash,[]).append(rec
)
66 bob_ipids
.setdefault(hash,[]).append(ipid
)
67 bob_hashes
.setdefault(ipid
,[]).append(hash)
68 bob_flows_by_hash
.setdefault(hash,[]).append(rec
)
70 batch
.setdefault(hash, 0)
72 pos
+= Protocol
.hash_length
77 """ Compare what Alice says she sent to what Bob says he received.
78 Report forgeries and dropped packets. """
80 def __init__(self
, flow
, m_tuple
):
82 self
.m_tuple
= m_tuple
83 self
.newest_information_from_a
= 0
84 self
.newest_information_from_b
= 0
85 # XXX there is some silly redudancy here, because we don't calculate
86 # which side (sending / receiving) is calling this constructor, so we
87 # don't know which view of the flow we have until add_link is called
92 self
.respond_to_forgeries
= True # used by Swtizerland.py
93 self
.birthday
= time
.time()
95 # These two structures are lists of batches:
96 # we might have stored them like this:
97 # [(timestamp, {hash1:int count1, hash2: int count2}), (timestamp2,{..]
98 # but it's easier to delete by reference if we put the timestamp
99 # _inside_ the hash, like this:
101 # a_to_b : [{TIMESTAMP:timestamp, hash1:count1, hash2:count2}, ..]
104 # These are dicts for the whole queue, mapping each hash to the batches
105 # that contain at least one packet with that hash, something like:
106 # sent_packets {hash1:[a_to_b[1], a_to_b[7], a_to_b[33]}
107 self
.sent_packets
= {}
108 self
.recd_packets
= {}
110 self
.okay_packets
= 0
111 self
.forged_packets
= 0
112 self
.dropped_packets
= 0
113 self
.finalized
= False
116 global id_num
, id_lock
123 archives
= cPickle
.load("archives.pickle")
126 def add_link(self
, link
, id, f_tuple
):
127 "Figure out whether this link is alice or bob, and remember it"
129 log
.error("add_link should not be called with f_tuple=None!!!!")
133 ip
= s
.inet_aton(link
.peer
[0])
135 if ip
== self
.m_tuple
[0]:
136 self
.src_links
.append((link
, id))
137 self
.src_flow
= f_tuple
138 if len(self
.src_links
) != 1:
139 link
.debug_note("Duplicate src_links: %s" % `self
.src_links`
)
140 elif ip
== self
.m_tuple
[1]:
141 self
.dest_links
.append((link
, id))
142 self
.dest_flow
= f_tuple
143 if len(self
.dest_links
) != 1:
144 link
.debug_note("Duplicate dest_links: %s" % `self
.dest_links`
)
146 link
.protocol_error("Argh, confused about links and reconciliators!\n")
148 if self
.dest_links
and self
.src_links
:
150 skew1
= max([l
.get_clock_dispersion() for l
,id in self
.src_links
])
151 skew2
= max([l
.get_clock_dispersion() for l
,id in self
.dest_links
])
152 self
.max_clock_skew
= (skew1
+ skew2
) + clock_safety_margin
153 if not (self
.src_flow
and self
.dest_flow
):
154 # This is debugging for "weird error"s
155 log
.error("Was about to ready a one-sided flow!!!")
158 log
.debug("We now have both sides of flow %s", print_flow_tuple(self
.flow
))
159 return True # have both sides
161 log
.debug("We currently only have one side of flow: %s", print_flow_tuple(self
.flow
))
167 "Return a pair of the number of unreconciled packets in this flow"
168 return (len(self
.sent_packets
), len(self
.recd_packets
))
170 def prettyprint(self
):
172 Looks something like this:
174 CURRENT FLOW TABLE: okay drop mod/frg pend t/rx prot
175 111.222.233.244:12345 > 244.233.222.211:78901 343004 10000 100001 1000/2331 icmp
176 (192.168.1.100:54343) (192.168.33.212:2333) opening_hash:
179 pub_src
,pub_dest
= map(s
.inet_ntoa
, self
.m_tuple
[0:2])
180 o_hash
= self
.m_tuple
[2]
181 # the other side is the only reliable indicator of each side's
184 pub_src
+= ":" + `util
.bin2int(self
.dest_flow
[1])`
185 pub_dest
+= ":" + `util
.bin2int(self
.src_flow
[3])`
187 log
.error("Weird error caused by flow %s (%s, %s)" % (`self
.flow`
, \
188 `self
.dest_flow`
, `self
.src_flow`
))
189 log
.error(traceback
.format_exc())
191 line1
= pub_src
.center(21) + " > " + pub_dest
.center(21)
193 # 19 = len("192.168.1.100:65535") -- this should be okay unless
194 # the private addresses & ports are rather unusual
195 priv_src
= priv_dest
= "not firewalled".center(19)
196 if self
.src_links
[0][0].alice_firewalled
:
197 priv_src
= self
.src_links
[0][0].peers_private_ip
198 priv_src
+= ":" + `util
.bin2int(self
.src_flow
[1])`
199 if self
.dest_links
[0][0].alice_firewalled
: # here, alice means bob :)
200 priv_dest
= self
.dest_links
[0][0].peers_private_ip
201 priv_dest
+= ":" + `util
.bin2int(self
.dest_flow
[3])`
203 line2
= "(%19s) (%19s)" % (priv_src
, priv_dest
)
205 line1
+= " %6g %5g %6g %4g/%4g " % (self
.okay_packets
, self
.dropped_packets
,\
206 self
.forged_packets
, len(self
.sent_packets
), len(self
.recd_packets
))
207 line1
+= util
.prot_name(util
.bin2int(self
.flow
[4]))
208 return line1
+ "\n" + line2
210 __str__
= prettyprint
213 def final_judgement(self
):
214 """ flag newest information from alice and bob at infinity
215 to be used in testcases to flag all remaining packets """
218 forged
= self
.alice_sent_flow_status(1e308
)
219 dropped
= self
.bob_sent_flow_status(1e308
)
220 self
.finalized
= True
224 log
.debug("Forged in judgement %s", `forged`
)
226 log
.debug("Dropped in judgement %s", `dropped`
)
227 return (forged
, dropped
)
229 def alice_sent_flow_status(self
, timestamp
):
231 called when alice reports status for a flow (e.g. that it was idle)
232 this way we can know that alice didn't send a packet that bob received
233 even if alice doesn't send more packets afterwards
236 assert not self
.finalized
, 'not expecting finalized'
239 assert timestamp
>= self
.newest_information_from_a
, 'expecting timestamp to be monotonically increasing, %f < %f' % (timestamp
, self
.newest_information_from_a
)
241 self
.monotonicity_error()
242 self
.newest_information_from_a
= timestamp
243 forged
= self
.__check
_for
_forgeries
()
248 def bob_sent_flow_status(self
, timestamp
):
249 """ called when bob reports status for a flow (e.g. that it was idle) """
251 assert not self
.finalized
, 'not expecting finalized'
254 assert timestamp
>= self
.newest_information_from_b
, 'expecting timestamp to be monotonically increasing %f < %f' % (timestamp
, self
.newest_information_from_b
)
256 self
.monotonicity_error()
257 self
.newest_information_from_b
= timestamp
258 dropped
= self
.check_for_drops()
263 def sent_by_alice(self
, timestamp
, hashes
):
264 "Called by Switzerland.py as new sent packets are reported by Alice."
267 self
.check_dangling()
268 assert not self
.finalized
, 'packets arriving in finalized reconciliator'
270 assert timestamp
>= self
.newest_information_from_a
, \
271 'expecting timestamp to be monotonically increasing %f < %f' % \
272 (timestamp
, self
.newest_information_from_a
)
274 self
.monotonicity_error()
275 self
.newest_information_from_a
= timestamp
277 batch
= makebatch(timestamp
,hashes
,True,self
)
281 if hash in forged_history
:
282 # XXX This started out as a debugging sanity check, but now
283 # perhaps this opens us to a DOS attack? How else can we convey
284 # the seriousness of this condition? We should send error
285 # messages to alice and bob; alice should probably be disconnected
286 # because she's possibly a culprit...
287 log
.error("TIMING ERROR, sent packets arriving unacceptably late")
290 self
.__discard
_from
_new
_batch
(batch
, self
.recd_packets
, self
.b_from_a
)
291 # Are there any packets left in the batch?
292 if len(batch
) > 1: # 0 or more packets -- TIMESTAMP takes 1 slot
293 self
.a_to_b
.append(batch
)
295 if hash != TIMESTAMP
:
296 self
.sent_packets
.setdefault(hash,[]).append(batch
)
297 forged
= self
.__check
_for
_forgeries
()
302 def recd_by_bob(self
, timestamp
, hashes
):
303 "Very similar to sent_by_alice, but confusing if it's factorised."
306 self
.check_dangling()
307 assert not self
.finalized
, 'not expecting finalized'
309 assert timestamp
>= self
.newest_information_from_b
, \
310 'expecting timestamp to be monotonically increasing %f < %f' % \
311 (timestamp
, self
.newest_information_from_b
)
313 self
.monotonicity_error()
314 self
.newest_information_from_b
= timestamp
316 batch
= makebatch(timestamp
,hashes
,False,self
)
317 self
.__discard
_from
_new
_batch
(batch
, self
.sent_packets
, self
.a_to_b
)
318 # Are there any packets left in the batch?
319 if len(batch
) > 1: # TIMESTAMP still takes a slot
320 self
.b_from_a
.append(batch
)
322 if hash != TIMESTAMP
:
323 self
.recd_packets
.setdefault(hash,[]).append(batch
)
324 forged
= self
.__check
_for
_forgeries
()
325 # XXX check for drops?
330 def monotonicity_error(self
):
332 PacketListener.py in the client sometimes observes small amounts of
333 monotonicity, but the arrival of batches to the server which are
334 out-of-order is a much more serious condition. It can in theory happen
335 just because of very bad luck on top of the small monotonicities, but
338 (Question: batches have a whole-batch timestamp; is that the first or last
339 packet in the batch?)
342 log
.error("Flow causing monotonicity error:\n" + self
.prettyprint())
344 log
.error("Monotonicity error in unready flow %r (%r, %r)" % (self
.flow
, \
345 self
.dest_flow
, self
.src_flow
))
346 log
.error("Innards of reconciliator:\n%r\n%r\n%r\n%r" % (self
.a_to_b
, \
347 self
.b_from_a
, self
.sent_packets
, self
.recd_packets
))
351 def check_dangling(self
):
353 A reconciliator is said to be dangling if for some reason it Alice and Bob
354 never get matched (ie self.ready is never True). Typical causes might be
355 modifications to the opening packet that change opening_hash; Alice and Bob
356 seeing different packets as the first packet in the flow (most likely if
357 the flow is older than their circle membership), or a flow between Alice
358 and someone other than Bob behind Bob's firewall. Raise an Exception if
359 we're deemed to be dangling.
363 if time
.time() > self
.birthday
+ dangling_timeout
:
367 def __discard_from_new_batch(self
, new_batch
, other_dict
, other_batches
):
368 "We have a new batch, now remove everything in it that matches."
369 for hash, num
in new_batch
.items():
370 if hash == TIMESTAMP
or mark_everything_forged
:
372 while hash in other_dict
:
373 # the hash matches on the other side; discard it
375 self
.okay_packets
+= 1
376 if hash_event_archival
:
377 event
= "Discarded on receipt"
378 events_by_hash
.setdefault(hash,[]).append(event
)
379 # cancel with the oldest instance on the other side
380 other_batch
= other_dict
[hash][0]
381 other_batch
[hash] -= 1
382 # the other side probably only had one of this hash in that batch:
383 if other_batch
[hash] == 0:
384 if hash_event_archival
:
385 event
= "No more in batch"
386 events_by_hash
.setdefault(hash,[]).append(event
)
388 del other_batch
[hash]
389 # and remove that batch from their list of batches w/ this hash:
390 del other_dict
[hash][0]
391 # and if that's now empty, they no longer have this hash at all:
392 if other_dict
[hash] == []:
394 if hash_event_archival
:
395 event
= "Discard emptied other dict"
396 events_by_hash
.setdefault(hash,[]).append(event
)
398 if new_batch
[hash] == 0:
400 if hash_event_archival
:
401 event
= "Emptied on this side"
402 events_by_hash
.setdefault(hash,[]).append(event
)
403 #no copies of the hash left on our side, so we can't cancel further
406 def __check_for_forgeries(self
):
408 a packet is a forgery if bob got it and we know alice didn't send it.
409 we know alice didn't send a packet if
410 - it isn't in sent_packets, and
411 - alice has sent newer packets (or given a newer report of no activity).
412 note: assuming clocks are synchronized to within max_clock_skew seconds
413 note: bob can't receive a packet before alice sends it.
417 antideadline
= self
.newest_information_from_a
- self
.max_clock_skew
418 forgeries
= self
.scan_batches(
419 self
.b_from_a
, self
.recd_packets
, self
.sent_packets
,
420 lambda b
: b
[TIMESTAMP
] < antideadline
422 self
.forged_packets
+= len(forgeries
)
424 self
.trace_forgery_event(forgeries
)
427 def diagnose(self
, dict):
428 str = "Dict of length %d " % len(dict)
429 entries
= [len(batch
) for batch
in dict.values()]
430 zeroes
= len([e
for e
in entries
if e
== 0])
431 ones
= len([e
for e
in entries
if e
== 1])
432 others
= len([e
for e
in entries
if e
> 1])
433 str += "%d 0s, %d 1s, %d 1+s" % (zeroes
, ones
, others
)
436 def trace_forgery_event(self
, forgeries
):
439 assert len(f
[1]) == Protocol
.hash_length
-2 # XXX transient
440 forged_history
.append(f
[1])
445 print hexlify(b_hash
), "is a forgery"
447 ipids
= bob_ipids
[b_hash
]
448 print "IPIDs that match this forgery are:", ipids
450 a_hashes
= alice_hashes
.setdefault(ipid
,[])
451 print " ", hexlify(ipid
), "matches", map(hexlify
,a_hashes
),"from alice"
452 for hash in a_hashes
:
453 a_ipids
= alice_ipids
[hash]
454 print " ", hexlify(hash), "matches", map(hexlify
,a_ipids
)
456 print " (which is crazy!)"
457 print " Alice flows", alice_flows_by_hash
.setdefault(hash,[])
458 print " Bob flows", bob_flows_by_hash
.setdefault(hash,[])
459 if hash_event_archival
:
460 print " History is", events_by_hash
.setdefault(hash,[])
463 def scan_batches(self
, batches
, dict, other_dict
, condition
):
465 Proceed through the list of batches in chronological order.
466 For those old enough to match "condition", remove them from our dict.
467 We know they should not match the other side's dict.
468 ("side" = sent | received)
472 for batch
in batches
:
473 if not condition(batch
):
476 for hash, num
in batch
.items():
477 if hash != TIMESTAMP
:
479 assert hash in dict, "hash %s is not in dict %s!" % (hash,self
.diagnose(dict))
480 # __discard_from_new_batch should ensure this:
481 assert (hash not in other_dict
) or mark_everything_forged
483 list_of_occurences
= dict[hash]
484 ptr
= list_of_occurences
.pop(0)
486 if list_of_occurences
== []:
488 if hash_event_archival
:
489 event
= "Deleted in scan_batch"
490 events_by_hash
.setdefault(hash,[]).append(event
)
491 for i
in xrange(num
):
492 results
.append((batch
[TIMESTAMP
], hash))
497 def check_for_drops(self
):
498 """ a packet is dropped if alice sent it and we know bob didn't get it.
499 we know bob didn't get a packet if it's been more than drop_timeout
500 seconds since alice reported sending it """
501 assert not self
.finalized
, 'not expecting finalized'
506 dropped
= self
.scan_batches(
507 self
.a_to_b
, self
.sent_packets
,
509 lambda b
: self
.newest_information_from_b
- b
[TIMESTAMP
] > drop_timeout
)
513 self
.dropped_packets
+= len(dropped
)