10 // taggedPacket is a combination of a []byte and a net.Addr, encapsulating the
11 // return type of PacketConn.ReadFrom.
12 type taggedPacket
struct {
17 // QueuePacketConn implements net.PacketConn by storing queues of packets. There
18 // is one incoming queue (where packets are additionally tagged by the source
19 // address of the peer that sent them). There are many outgoing queues, one for
20 // each remote peer address that has been recently seen. The QueueIncoming
21 // method inserts a packet into the incoming queue, to eventually be returned by
22 // ReadFrom. WriteTo inserts a packet into an address-specific outgoing queue,
23 // which can later by accessed through the OutgoingQueue method.
25 // Besides the outgoing queues, there is also a one-element "stash" for each
26 // remote peer address. You can stash a packet using the Stash method, and get
27 // it back later by receiving from the channel returned by Unstash. The stash is
28 // meant as a convenient place to temporarily store a single packet, such as
29 // when you've read one too many packets from the send queue and need to store
30 // the extra packet to be processed first in the next pass. It's the caller's
31 // responsibility to Unstash what they have Stashed. Calling Stash does not put
32 // the packet at the head of the send queue; if there is the possibility that a
33 // packet has been stashed, it must be checked for by calling Unstash in
34 // addition to OutgoingQueue.
35 type QueuePacketConn
struct {
38 recvQueue
chan taggedPacket
41 // What error to return when the QueuePacketConn is closed.
45 // NewQueuePacketConn makes a new QueuePacketConn, set to track recent peers
46 // for at least a duration of timeout.
47 func NewQueuePacketConn(localAddr net
.Addr
, timeout time
.Duration
) *QueuePacketConn
{
48 return &QueuePacketConn
{
49 remotes
: NewRemoteMap(timeout
),
51 recvQueue
: make(chan taggedPacket
, queueSize
),
52 closed: make(chan struct{}),
56 // QueueIncoming queues and incoming packet and its source address, to be
57 // returned in a future call to ReadFrom.
58 func (c
*QueuePacketConn
) QueueIncoming(p
[]byte, addr net
.Addr
) {
61 // If we're closed, silently drop it.
65 // Copy the slice so that the caller may reuse it.
66 buf
:= make([]byte, len(p
))
69 case c
.recvQueue
<- taggedPacket
{buf
, addr
}:
71 // Drop the incoming packet if the receive queue is full.
75 // OutgoingQueue returns the queue of outgoing packets corresponding to addr,
76 // creating it if necessary. The contents of the queue will be packets that are
77 // written to the address in question using WriteTo.
78 func (c
*QueuePacketConn
) OutgoingQueue(addr net
.Addr
) <-chan []byte {
79 return c
.remotes
.SendQueue(addr
)
82 // Stash places p in the stash for addr, if the stash is not already occupied.
83 // Returns true if the packet was placed in the stash, or false if the stash was
84 // already occupied. This method is similar to WriteTo, except that it puts the
85 // packet in the stash queue (accessible via Unstash), rather than the outgoing
86 // queue (accessible via OutgoingQueue).
87 func (c
*QueuePacketConn
) Stash(p
[]byte, addr net
.Addr
) bool {
88 return c
.remotes
.Stash(addr
, p
)
91 // Unstash returns the channel that represents the stash for addr.
92 func (c
*QueuePacketConn
) Unstash(addr net
.Addr
) <-chan []byte {
93 return c
.remotes
.Unstash(addr
)
96 // ReadFrom returns a packet and address previously stored by QueueIncoming.
97 func (c
*QueuePacketConn
) ReadFrom(p
[]byte) (int, net
.Addr
, error
) {
100 return 0, nil, &net
.OpError
{Op
: "read", Net
: c
.LocalAddr().Network(), Addr
: c
.LocalAddr(), Err
: c
.err
.Load().(error
)}
105 return 0, nil, &net
.OpError
{Op
: "read", Net
: c
.LocalAddr().Network(), Addr
: c
.LocalAddr(), Err
: c
.err
.Load().(error
)}
106 case packet
:= <-c
.recvQueue
:
107 return copy(p
, packet
.P
), packet
.Addr
, nil
111 // WriteTo queues an outgoing packet for the given address. The queue can later
112 // be retrieved using the OutgoingQueue method.
113 func (c
*QueuePacketConn
) WriteTo(p
[]byte, addr net
.Addr
) (int, error
) {
116 return 0, &net
.OpError
{Op
: "write", Net
: c
.LocalAddr().Network(), Addr
: c
.LocalAddr(), Err
: c
.err
.Load().(error
)}
119 // Copy the slice so that the caller may reuse it.
120 buf
:= make([]byte, len(p
))
123 case c
.remotes
.SendQueue(addr
) <- buf
:
126 // Drop the outgoing packet if the send queue is full.
131 // closeWithError unblocks pending operations and makes future operations fail
132 // with the given error. If err is nil, it becomes errClosedPacketConn.
133 func (c
*QueuePacketConn
) closeWithError(err error
) error
{
135 c
.closeOnce
.Do(func() {
137 // Store the error to be returned by future PacketConn
140 err
= errClosedPacketConn
146 return &net
.OpError
{Op
: "close", Net
: c
.LocalAddr().Network(), Addr
: c
.LocalAddr(), Err
: c
.err
.Load().(error
)}
151 // Close unblocks pending operations and makes future operations fail with a
152 // "closed connection" error.
153 func (c
*QueuePacketConn
) Close() error
{
154 return c
.closeWithError(nil)
157 // LocalAddr returns the localAddr value that was passed to NewQueuePacketConn.
158 func (c
*QueuePacketConn
) LocalAddr() net
.Addr
{ return c
.localAddr
}
160 func (c
*QueuePacketConn
) SetDeadline(t time
.Time
) error
{ return errNotImplemented
}
161 func (c
*QueuePacketConn
) SetReadDeadline(t time
.Time
) error
{ return errNotImplemented
}
162 func (c
*QueuePacketConn
) SetWriteDeadline(t time
.Time
) error
{ return errNotImplemented
}