smux Stream.Write may also return EOF.
[dnstt.git] / turbotunnel / queuepacketconn.go
blob98693b101a3ac7153c0bb311e4e45815b5621222
1 package turbotunnel
3 import (
4 "net"
5 "sync"
6 "sync/atomic"
7 "time"
10 // taggedPacket is a combination of a []byte and a net.Addr, encapsulating the
11 // return type of PacketConn.ReadFrom.
12 type taggedPacket struct {
13 P []byte
14 Addr net.Addr
17 // QueuePacketConn implements net.PacketConn by storing queues of packets. There
18 // is one incoming queue (where packets are additionally tagged by the source
19 // address of the peer that sent them). There are many outgoing queues, one for
20 // each remote peer address that has been recently seen. The QueueIncoming
21 // method inserts a packet into the incoming queue, to eventually be returned by
22 // ReadFrom. WriteTo inserts a packet into an address-specific outgoing queue,
23 // which can later by accessed through the OutgoingQueue method.
24 type QueuePacketConn struct {
25 remotes *RemoteMap
26 localAddr net.Addr
27 recvQueue chan taggedPacket
28 closeOnce sync.Once
29 closed chan struct{}
30 // What error to return when the QueuePacketConn is closed.
31 err atomic.Value
34 // NewQueuePacketConn makes a new QueuePacketConn, set to track recent peers
35 // for at least a duration of timeout.
36 func NewQueuePacketConn(localAddr net.Addr, timeout time.Duration) *QueuePacketConn {
37 return &QueuePacketConn{
38 remotes: NewRemoteMap(timeout),
39 localAddr: localAddr,
40 recvQueue: make(chan taggedPacket, queueSize),
41 closed: make(chan struct{}),
45 // QueueIncoming queues and incoming packet and its source address, to be
46 // returned in a future call to ReadFrom.
47 func (c *QueuePacketConn) QueueIncoming(p []byte, addr net.Addr) {
48 select {
49 case <-c.closed:
50 // If we're closed, silently drop it.
51 return
52 default:
54 // Copy the slice so that the caller may reuse it.
55 buf := make([]byte, len(p))
56 copy(buf, p)
57 select {
58 case c.recvQueue <- taggedPacket{buf, addr}:
59 default:
60 // Drop the incoming packet if the receive queue is full.
64 // OutgoingQueue returns the queue of outgoing packets corresponding to addr,
65 // creating it if necessary. The contents of the queue will be packets that are
66 // written to the address in question using WriteTo.
67 func (c *QueuePacketConn) OutgoingQueue(addr net.Addr) <-chan []byte {
68 return c.remotes.SendQueue(addr)
71 // ReadFrom returns a packet and address previously stored by QueueIncoming.
72 func (c *QueuePacketConn) ReadFrom(p []byte) (int, net.Addr, error) {
73 select {
74 case <-c.closed:
75 return 0, nil, &net.OpError{Op: "read", Net: c.LocalAddr().Network(), Addr: c.LocalAddr(), Err: c.err.Load().(error)}
76 default:
78 select {
79 case <-c.closed:
80 return 0, nil, &net.OpError{Op: "read", Net: c.LocalAddr().Network(), Addr: c.LocalAddr(), Err: c.err.Load().(error)}
81 case packet := <-c.recvQueue:
82 return copy(p, packet.P), packet.Addr, nil
86 // WriteTo queues an outgoing packet for the given address. The queue can later
87 // be retrieved using the OutgoingQueue method.
88 func (c *QueuePacketConn) WriteTo(p []byte, addr net.Addr) (int, error) {
89 select {
90 case <-c.closed:
91 return 0, &net.OpError{Op: "write", Net: c.LocalAddr().Network(), Addr: c.LocalAddr(), Err: c.err.Load().(error)}
92 default:
94 // Copy the slice so that the caller may reuse it.
95 buf := make([]byte, len(p))
96 copy(buf, p)
97 select {
98 case c.remotes.SendQueue(addr) <- buf:
99 return len(buf), nil
100 default:
101 // Drop the outgoing packet if the send queue is full.
102 return len(buf), nil
106 // closeWithError unblocks pending operations and makes future operations fail
107 // with the given error. If err is nil, it becomes errClosedPacketConn.
108 func (c *QueuePacketConn) closeWithError(err error) error {
109 var newlyClosed bool
110 c.closeOnce.Do(func() {
111 newlyClosed = true
112 // Store the error to be returned by future PacketConn
113 // operations.
114 if err == nil {
115 err = errClosedPacketConn
117 c.err.Store(err)
118 close(c.closed)
120 if !newlyClosed {
121 return &net.OpError{Op: "close", Net: c.LocalAddr().Network(), Addr: c.LocalAddr(), Err: c.err.Load().(error)}
123 return nil
126 // Close unblocks pending operations and makes future operations fail with a
127 // "closed connection" error.
128 func (c *QueuePacketConn) Close() error {
129 return c.closeWithError(nil)
132 // LocalAddr returns the localAddr value that was passed to NewQueuePacketConn.
133 func (c *QueuePacketConn) LocalAddr() net.Addr { return c.localAddr }
135 func (c *QueuePacketConn) SetDeadline(t time.Time) error { return errNotImplemented }
136 func (c *QueuePacketConn) SetReadDeadline(t time.Time) error { return errNotImplemented }
137 func (c *QueuePacketConn) SetWriteDeadline(t time.Time) error { return errNotImplemented }