smux Stream.Write may also return EOF.
[dnstt.git] / turbotunnel / remotemap.go
blob7e6dd79111239f32fcb30546fff9d2cc91e5d563
1 package turbotunnel
3 import (
4 "container/heap"
5 "net"
6 "sync"
7 "time"
10 // remoteRecord is a record of a recently seen remote peer, with the time it was
11 // last seen and a send queue.
12 type remoteRecord struct {
13 Addr net.Addr
14 LastSeen time.Time
15 SendQueue chan []byte
18 // RemoteMap manages a mapping of live remote peers, keyed by address, to their
19 // respective send queues. RemoteMap's functions are safe to call from multiple
20 // goroutines.
21 type RemoteMap struct {
22 // We use an inner structure to avoid exposing public heap.Interface
23 // functions to users of remoteMap.
24 inner remoteMapInner
25 // Synchronizes access to inner.
26 lock sync.Mutex
29 // NewRemoteMap creates a RemoteMap that expires peers after a timeout.
31 // If the timeout is 0, peers never expire.
33 // The timeout does not have to be kept in sync with smux's idle timeout. If a
34 // peer is removed from the map while the smux session is still live, the worst
35 // that can happen is a loss of whatever packets were in the send queue at the
36 // time. If smux later decides to send more packets to the same peer, we'll
37 // instantiate a new send queue, and if the peer is ever seen again with a
38 // matching address, we'll deliver them.
39 func NewRemoteMap(timeout time.Duration) *RemoteMap {
40 m := &RemoteMap{
41 inner: remoteMapInner{
42 byAge: make([]*remoteRecord, 0),
43 byAddr: make(map[net.Addr]int),
46 if timeout > 0 {
47 go func() {
48 for {
49 time.Sleep(timeout / 2)
50 now := time.Now()
51 m.lock.Lock()
52 m.inner.removeExpired(now, timeout)
53 m.lock.Unlock()
55 }()
57 return m
60 // SendQueue returns the send queue corresponding to addr, creating it if
61 // necessary.
62 func (m *RemoteMap) SendQueue(addr net.Addr) chan []byte {
63 m.lock.Lock()
64 defer m.lock.Unlock()
65 return m.inner.SendQueue(addr, time.Now())
68 // remoteMapInner is the inner type of RemoteMap, implementing heap.Interface.
69 // byAge is the backing store, a heap ordered by LastSeen time, to facilitate
70 // expiring old records. byAddr is a map from addresses to heap indices, to
71 // allow looking up by address. Unlike RemoteMap, remoteMapInner requires
72 // external synchonization.
73 type remoteMapInner struct {
74 byAge []*remoteRecord
75 byAddr map[net.Addr]int
78 // removeExpired removes all records whose LastSeen timestamp is more than
79 // timeout in the past.
80 func (inner *remoteMapInner) removeExpired(now time.Time, timeout time.Duration) {
81 for len(inner.byAge) > 0 && now.Sub(inner.byAge[0].LastSeen) >= timeout {
82 record := heap.Pop(inner).(*remoteRecord)
83 close(record.SendQueue)
87 // SendQueue finds the existing record corresponding to addr, or creates a new
88 // one if none exists yet. It updates the record's LastSeen time and returns its
89 // SendQueue.
90 func (inner *remoteMapInner) SendQueue(addr net.Addr, now time.Time) chan []byte {
91 var record *remoteRecord
92 i, ok := inner.byAddr[addr]
93 if ok {
94 // Found one, update its LastSeen.
95 record = inner.byAge[i]
96 record.LastSeen = now
97 heap.Fix(inner, i)
98 } else {
99 // Not found, create a new one.
100 record = &remoteRecord{
101 Addr: addr,
102 LastSeen: now,
103 SendQueue: make(chan []byte, queueSize),
105 heap.Push(inner, record)
107 return record.SendQueue
110 // heap.Interface for remoteMapInner.
112 func (inner *remoteMapInner) Len() int {
113 if len(inner.byAge) != len(inner.byAddr) {
114 panic("inconsistent remoteMap")
116 return len(inner.byAge)
119 func (inner *remoteMapInner) Less(i, j int) bool {
120 return inner.byAge[i].LastSeen.Before(inner.byAge[j].LastSeen)
123 func (inner *remoteMapInner) Swap(i, j int) {
124 inner.byAge[i], inner.byAge[j] = inner.byAge[j], inner.byAge[i]
125 inner.byAddr[inner.byAge[i].Addr] = i
126 inner.byAddr[inner.byAge[j].Addr] = j
129 func (inner *remoteMapInner) Push(x interface{}) {
130 record := x.(*remoteRecord)
131 if _, ok := inner.byAddr[record.Addr]; ok {
132 panic("duplicate address in remoteMap")
134 // Insert into byAddr map.
135 inner.byAddr[record.Addr] = len(inner.byAge)
136 // Insert into byAge slice.
137 inner.byAge = append(inner.byAge, record)
140 func (inner *remoteMapInner) Pop() interface{} {
141 n := len(inner.byAddr)
142 // Remove from byAge slice.
143 record := inner.byAge[n-1]
144 inner.byAge[n-1] = nil
145 inner.byAge = inner.byAge[:n-1]
146 // Remove from byAddr map.
147 delete(inner.byAddr, record.Addr)
148 return record