Use a bugs.torproject.org semi-perma link.
[champa.git] / turbotunnel / remotemap.go
blobc679bfa4160ad84f1134d828ac13b1674e4911c8
1 package turbotunnel
3 import (
4 "container/heap"
5 "net"
6 "sync"
7 "time"
10 // remoteRecord is a record of a recently seen remote peer, with the time it was
11 // last seen and queues of outgoing packets.
12 type remoteRecord struct {
13 Addr net.Addr
14 LastSeen time.Time
15 SendQueue chan []byte
16 Stash chan []byte
19 // RemoteMap manages a mapping of live remote peers, keyed by address, to their
20 // respective send queues. Each peer has two queues: a primary send queue, and a
21 // "stash". The primary send queue is returned by the SendQueue method. The
22 // stash is an auxiliary one-element queue accessed using the Stash and Unstash
23 // methods. The stash is meant for use by callers that need to "unread" a packet
24 // that's already been removed from the primary send queue.
26 // RemoteMap's functions are safe to call from multiple goroutines.
27 type RemoteMap struct {
28 // We use an inner structure to avoid exposing public heap.Interface
29 // functions to users of remoteMap.
30 inner remoteMapInner
31 // Synchronizes access to inner.
32 lock sync.Mutex
35 // NewRemoteMap creates a RemoteMap that expires peers after a timeout.
37 // If the timeout is 0, peers never expire.
39 // The timeout does not have to be kept in sync with smux's idle timeout. If a
40 // peer is removed from the map while the smux session is still live, the worst
41 // that can happen is a loss of whatever packets were in the send queue at the
42 // time. If smux later decides to send more packets to the same peer, we'll
43 // instantiate a new send queue, and if the peer is ever seen again with a
44 // matching address, we'll deliver them.
45 func NewRemoteMap(timeout time.Duration) *RemoteMap {
46 m := &RemoteMap{
47 inner: remoteMapInner{
48 byAge: make([]*remoteRecord, 0),
49 byAddr: make(map[net.Addr]int),
52 if timeout > 0 {
53 go func() {
54 for {
55 time.Sleep(timeout / 2)
56 now := time.Now()
57 m.lock.Lock()
58 m.inner.removeExpired(now, timeout)
59 m.lock.Unlock()
61 }()
63 return m
66 // SendQueue returns the send queue corresponding to addr, creating it if
67 // necessary.
68 func (m *RemoteMap) SendQueue(addr net.Addr) chan []byte {
69 m.lock.Lock()
70 defer m.lock.Unlock()
71 return m.inner.Lookup(addr, time.Now()).SendQueue
74 // Stash places p in the stash corresponding to addr, if the stash is not
75 // already occupied. Returns true if the p was placed in the stash, false
76 // otherwise.
77 func (m *RemoteMap) Stash(addr net.Addr, p []byte) bool {
78 m.lock.Lock()
79 defer m.lock.Unlock()
80 select {
81 case m.inner.Lookup(addr, time.Now()).Stash <- p:
82 return true
83 default:
84 return false
88 // Unstash returns the channel that reads from the stash for addr.
89 func (m *RemoteMap) Unstash(addr net.Addr) <-chan []byte {
90 m.lock.Lock()
91 defer m.lock.Unlock()
92 return m.inner.Lookup(addr, time.Now()).Stash
95 // remoteMapInner is the inner type of RemoteMap, implementing heap.Interface.
96 // byAge is the backing store, a heap ordered by LastSeen time, to facilitate
97 // expiring old records. byAddr is a map from addresses to heap indices, to
98 // allow looking up by address. Unlike RemoteMap, remoteMapInner requires
99 // external synchonization.
100 type remoteMapInner struct {
101 byAge []*remoteRecord
102 byAddr map[net.Addr]int
105 // removeExpired removes all records whose LastSeen timestamp is more than
106 // timeout in the past.
107 func (inner *remoteMapInner) removeExpired(now time.Time, timeout time.Duration) {
108 for len(inner.byAge) > 0 && now.Sub(inner.byAge[0].LastSeen) >= timeout {
109 record := heap.Pop(inner).(*remoteRecord)
110 close(record.SendQueue)
114 // Lookup finds the existing record corresponding to addr, or creates a new
115 // one if none exists yet. It updates the record's LastSeen time and returns the
116 // record.
117 func (inner *remoteMapInner) Lookup(addr net.Addr, now time.Time) *remoteRecord {
118 var record *remoteRecord
119 i, ok := inner.byAddr[addr]
120 if ok {
121 // Found one, update its LastSeen.
122 record = inner.byAge[i]
123 record.LastSeen = now
124 heap.Fix(inner, i)
125 } else {
126 // Not found, create a new one.
127 record = &remoteRecord{
128 Addr: addr,
129 LastSeen: now,
130 SendQueue: make(chan []byte, queueSize),
131 Stash: make(chan []byte, 1),
133 heap.Push(inner, record)
135 return record
138 // heap.Interface for remoteMapInner.
140 func (inner *remoteMapInner) Len() int {
141 if len(inner.byAge) != len(inner.byAddr) {
142 panic("inconsistent remoteMap")
144 return len(inner.byAge)
147 func (inner *remoteMapInner) Less(i, j int) bool {
148 return inner.byAge[i].LastSeen.Before(inner.byAge[j].LastSeen)
151 func (inner *remoteMapInner) Swap(i, j int) {
152 inner.byAge[i], inner.byAge[j] = inner.byAge[j], inner.byAge[i]
153 inner.byAddr[inner.byAge[i].Addr] = i
154 inner.byAddr[inner.byAge[j].Addr] = j
157 func (inner *remoteMapInner) Push(x interface{}) {
158 record := x.(*remoteRecord)
159 if _, ok := inner.byAddr[record.Addr]; ok {
160 panic("duplicate address in remoteMap")
162 // Insert into byAddr map.
163 inner.byAddr[record.Addr] = len(inner.byAge)
164 // Insert into byAge slice.
165 inner.byAge = append(inner.byAge, record)
168 func (inner *remoteMapInner) Pop() interface{} {
169 n := len(inner.byAddr)
170 // Remove from byAge slice.
171 record := inner.byAge[n-1]
172 inner.byAge[n-1] = nil
173 inner.byAge = inner.byAge[:n-1]
174 // Remove from byAddr map.
175 delete(inner.byAddr, record.Addr)
176 return record