10 // taggedPacket is a combination of a []byte and a net.Addr, encapsulating the
11 // return type of PacketConn.ReadFrom.
12 type taggedPacket
struct {
17 // QueuePacketConn implements net.PacketConn by storing queues of packets. There
18 // is one incoming queue (where packets are additionally tagged by the source
19 // address of the peer that sent them). There are many outgoing queues, one for
20 // each remote peer address that has been recently seen. The QueueIncoming
21 // method inserts a packet into the incoming queue, to eventually be returned by
22 // ReadFrom. WriteTo inserts a packet into an address-specific outgoing queue,
23 // which can later by accessed through the OutgoingQueue method.
24 type QueuePacketConn
struct {
27 recvQueue
chan taggedPacket
30 // What error to return when the QueuePacketConn is closed.
34 // NewQueuePacketConn makes a new QueuePacketConn, set to track recent peers
35 // for at least a duration of timeout.
36 func NewQueuePacketConn(localAddr net
.Addr
, timeout time
.Duration
) *QueuePacketConn
{
37 return &QueuePacketConn
{
38 remotes
: NewRemoteMap(timeout
),
40 recvQueue
: make(chan taggedPacket
, queueSize
),
41 closed: make(chan struct{}),
45 // QueueIncoming queues and incoming packet and its source address, to be
46 // returned in a future call to ReadFrom.
47 func (c
*QueuePacketConn
) QueueIncoming(p
[]byte, addr net
.Addr
) {
50 // If we're closed, silently drop it.
54 // Copy the slice so that the caller may reuse it.
55 buf
:= make([]byte, len(p
))
58 case c
.recvQueue
<- taggedPacket
{buf
, addr
}:
60 // Drop the incoming packet if the receive queue is full.
64 // OutgoingQueue returns the queue of outgoing packets corresponding to addr,
65 // creating it if necessary. The contents of the queue will be packets that are
66 // written to the address in question using WriteTo.
67 func (c
*QueuePacketConn
) OutgoingQueue(addr net
.Addr
) <-chan []byte {
68 return c
.remotes
.SendQueue(addr
)
71 // ReadFrom returns a packet and address previously stored by QueueIncoming.
72 func (c
*QueuePacketConn
) ReadFrom(p
[]byte) (int, net
.Addr
, error
) {
75 return 0, nil, &net
.OpError
{Op
: "read", Net
: c
.LocalAddr().Network(), Addr
: c
.LocalAddr(), Err
: c
.err
.Load().(error
)}
80 return 0, nil, &net
.OpError
{Op
: "read", Net
: c
.LocalAddr().Network(), Addr
: c
.LocalAddr(), Err
: c
.err
.Load().(error
)}
81 case packet
:= <-c
.recvQueue
:
82 return copy(p
, packet
.P
), packet
.Addr
, nil
86 // WriteTo queues an outgoing packet for the given address. The queue can later
87 // be retrieved using the OutgoingQueue method.
88 func (c
*QueuePacketConn
) WriteTo(p
[]byte, addr net
.Addr
) (int, error
) {
91 return 0, &net
.OpError
{Op
: "write", Net
: c
.LocalAddr().Network(), Addr
: c
.LocalAddr(), Err
: c
.err
.Load().(error
)}
94 // Copy the slice so that the caller may reuse it.
95 buf
:= make([]byte, len(p
))
98 case c
.remotes
.SendQueue(addr
) <- buf
:
101 // Drop the outgoing packet if the send queue is full.
106 // closeWithError unblocks pending operations and makes future operations fail
107 // with the given error. If err is nil, it becomes errClosedPacketConn.
108 func (c
*QueuePacketConn
) closeWithError(err error
) error
{
110 c
.closeOnce
.Do(func() {
112 // Store the error to be returned by future PacketConn
115 err
= errClosedPacketConn
121 return &net
.OpError
{Op
: "close", Net
: c
.LocalAddr().Network(), Addr
: c
.LocalAddr(), Err
: c
.err
.Load().(error
)}
126 // Close unblocks pending operations and makes future operations fail with a
127 // "closed connection" error.
128 func (c
*QueuePacketConn
) Close() error
{
129 return c
.closeWithError(nil)
132 // LocalAddr returns the localAddr value that was passed to NewQueuePacketConn.
133 func (c
*QueuePacketConn
) LocalAddr() net
.Addr
{ return c
.localAddr
}
135 func (c
*QueuePacketConn
) SetDeadline(t time
.Time
) error
{ return errNotImplemented
}
136 func (c
*QueuePacketConn
) SetReadDeadline(t time
.Time
) error
{ return errNotImplemented
}
137 func (c
*QueuePacketConn
) SetWriteDeadline(t time
.Time
) error
{ return errNotImplemented
}