Remove obsolete test for POST in decodeRequest.
[champa.git] / turbotunnel / queuepacketconn.go
blobeb4df4b0622342225e54fed7b0fc7ab3bbb650b5
1 package turbotunnel
3 import (
4 "net"
5 "sync"
6 "sync/atomic"
7 "time"
10 // taggedPacket is a combination of a []byte and a net.Addr, encapsulating the
11 // return type of PacketConn.ReadFrom.
12 type taggedPacket struct {
13 P []byte
14 Addr net.Addr
17 // QueuePacketConn implements net.PacketConn by storing queues of packets. There
18 // is one incoming queue (where packets are additionally tagged by the source
19 // address of the peer that sent them). There are many outgoing queues, one for
20 // each remote peer address that has been recently seen. The QueueIncoming
21 // method inserts a packet into the incoming queue, to eventually be returned by
22 // ReadFrom. WriteTo inserts a packet into an address-specific outgoing queue,
23 // which can later by accessed through the OutgoingQueue method.
25 // Besides the outgoing queues, there is also a one-element "stash" for each
26 // remote peer address. You can stash a packet using the Stash method, and get
27 // it back later by receiving from the channel returned by Unstash. The stash is
28 // meant as a convenient place to temporarily store a single packet, such as
29 // when you've read one too many packets from the send queue and need to store
30 // the extra packet to be processed first in the next pass. It's the caller's
31 // responsibility to Unstash what they have Stashed. Calling Stash does not put
32 // the packet at the head of the send queue; if there is the possibility that a
33 // packet has been stashed, it must be checked for by calling Unstash in
34 // addition to OutgoingQueue.
35 type QueuePacketConn struct {
36 remotes *RemoteMap
37 localAddr net.Addr
38 recvQueue chan taggedPacket
39 closeOnce sync.Once
40 closed chan struct{}
41 // What error to return when the QueuePacketConn is closed.
42 err atomic.Value
45 // NewQueuePacketConn makes a new QueuePacketConn, set to track recent peers
46 // for at least a duration of timeout.
47 func NewQueuePacketConn(localAddr net.Addr, timeout time.Duration) *QueuePacketConn {
48 return &QueuePacketConn{
49 remotes: NewRemoteMap(timeout),
50 localAddr: localAddr,
51 recvQueue: make(chan taggedPacket, queueSize),
52 closed: make(chan struct{}),
56 // QueueIncoming queues and incoming packet and its source address, to be
57 // returned in a future call to ReadFrom.
58 func (c *QueuePacketConn) QueueIncoming(p []byte, addr net.Addr) {
59 select {
60 case <-c.closed:
61 // If we're closed, silently drop it.
62 return
63 default:
65 // Copy the slice so that the caller may reuse it.
66 buf := make([]byte, len(p))
67 copy(buf, p)
68 select {
69 case c.recvQueue <- taggedPacket{buf, addr}:
70 default:
71 // Drop the incoming packet if the receive queue is full.
75 // OutgoingQueue returns the queue of outgoing packets corresponding to addr,
76 // creating it if necessary. The contents of the queue will be packets that are
77 // written to the address in question using WriteTo.
78 func (c *QueuePacketConn) OutgoingQueue(addr net.Addr) <-chan []byte {
79 return c.remotes.SendQueue(addr)
82 // Stash places p in the stash for addr, if the stash is not already occupied.
83 // Returns true if the packet was placed in the stash, or false if the stash was
84 // already occupied. This method is similar to WriteTo, except that it puts the
85 // packet in the stash queue (accessible via Unstash), rather than the outgoing
86 // queue (accessible via OutgoingQueue).
87 func (c *QueuePacketConn) Stash(p []byte, addr net.Addr) bool {
88 return c.remotes.Stash(addr, p)
91 // Unstash returns the channel that represents the stash for addr.
92 func (c *QueuePacketConn) Unstash(addr net.Addr) <-chan []byte {
93 return c.remotes.Unstash(addr)
96 // ReadFrom returns a packet and address previously stored by QueueIncoming.
97 func (c *QueuePacketConn) ReadFrom(p []byte) (int, net.Addr, error) {
98 select {
99 case <-c.closed:
100 return 0, nil, &net.OpError{Op: "read", Net: c.LocalAddr().Network(), Addr: c.LocalAddr(), Err: c.err.Load().(error)}
101 default:
103 select {
104 case <-c.closed:
105 return 0, nil, &net.OpError{Op: "read", Net: c.LocalAddr().Network(), Addr: c.LocalAddr(), Err: c.err.Load().(error)}
106 case packet := <-c.recvQueue:
107 return copy(p, packet.P), packet.Addr, nil
111 // WriteTo queues an outgoing packet for the given address. The queue can later
112 // be retrieved using the OutgoingQueue method.
113 func (c *QueuePacketConn) WriteTo(p []byte, addr net.Addr) (int, error) {
114 select {
115 case <-c.closed:
116 return 0, &net.OpError{Op: "write", Net: c.LocalAddr().Network(), Addr: c.LocalAddr(), Err: c.err.Load().(error)}
117 default:
119 // Copy the slice so that the caller may reuse it.
120 buf := make([]byte, len(p))
121 copy(buf, p)
122 select {
123 case c.remotes.SendQueue(addr) <- buf:
124 return len(buf), nil
125 default:
126 // Drop the outgoing packet if the send queue is full.
127 return len(buf), nil
131 // closeWithError unblocks pending operations and makes future operations fail
132 // with the given error. If err is nil, it becomes errClosedPacketConn.
133 func (c *QueuePacketConn) closeWithError(err error) error {
134 var newlyClosed bool
135 c.closeOnce.Do(func() {
136 newlyClosed = true
137 // Store the error to be returned by future PacketConn
138 // operations.
139 if err == nil {
140 err = errClosedPacketConn
142 c.err.Store(err)
143 close(c.closed)
145 if !newlyClosed {
146 return &net.OpError{Op: "close", Net: c.LocalAddr().Network(), Addr: c.LocalAddr(), Err: c.err.Load().(error)}
148 return nil
151 // Close unblocks pending operations and makes future operations fail with a
152 // "closed connection" error.
153 func (c *QueuePacketConn) Close() error {
154 return c.closeWithError(nil)
157 // LocalAddr returns the localAddr value that was passed to NewQueuePacketConn.
158 func (c *QueuePacketConn) LocalAddr() net.Addr { return c.localAddr }
160 func (c *QueuePacketConn) SetDeadline(t time.Time) error { return errNotImplemented }
161 func (c *QueuePacketConn) SetReadDeadline(t time.Time) error { return errNotImplemented }
162 func (c *QueuePacketConn) SetWriteDeadline(t time.Time) error { return errNotImplemented }