12 "www.bamsoftware.com/git/champa.git/encapsulation"
13 "www.bamsoftware.com/git/champa.git/turbotunnel"
17 // pollLoop has a poll timer that automatically sends an empty polling
18 // query when a certain amount of time has elapsed without a send. The
19 // poll timer is initially set to initPollDelay. It increases by a
20 // factor of pollDelayMultiplier every time the poll timer expires, up
21 // to a maximum of maxPollDelay. The poll timer is reset to
22 // initPollDelay whenever an a send occurs that is not the result of the
23 // poll timer expiring.
24 initPollDelay
= 1 * time
.Second
25 maxPollDelay
= 10 * time
.Second
26 pollDelayMultiplier
= 2.0
28 // How long we wait for a start-to-finish request–response exchange,
29 // including reading the response body.
30 pollTimeout
= 30 * time
.Second
33 // PollingPacketConn implements the net.PacketConn interface over an abstract
34 // poll function. Packets addressed to remoteAddr are passed to WriteTo are
35 // batched, encapsulated, and passed to the poll function. Packets addressed to
36 // other remote addresses are ignored. The poll function returns its own batch
37 // of incoming packets which are queued to be returned from a future call to
39 type PollingPacketConn
struct {
41 clientID turbotunnel
.ClientID
43 cancel context
.CancelFunc
44 // QueuePacketConn is the direct receiver of ReadFrom and WriteTo calls.
45 // sendLoop removes messages from the outgoing queue that were placed
46 // there by WriteTo, and inserts messages into the incoming queue to be
47 // returned from ReadFrom.
48 *turbotunnel
.QueuePacketConn
51 type PollFunc
func(context
.Context
, []byte) (io
.ReadCloser
, error
)
53 func NewPollingPacketConn(remoteAddr net
.Addr
, poll PollFunc
) *PollingPacketConn
{
54 clientID
:= turbotunnel
.NewClientID()
55 ctx
, cancel
:= context
.WithCancel(context
.Background())
56 c
:= &PollingPacketConn
{
57 remoteAddr
: remoteAddr
,
61 QueuePacketConn
: turbotunnel
.NewQueuePacketConn(clientID
, 0),
64 err
:= c
.pollLoop(poll
)
66 log
.Printf("pollLoop: %v", err
)
72 // Close cancels any in-progress polls and closes the underlying
74 func (c
*PollingPacketConn
) Close() error
{
76 return c
.QueuePacketConn
.Close()
79 func (c
*PollingPacketConn
) pollLoop(poll PollFunc
) error
{
80 // TODO: compute this dynamically, considering URL length and encoding
82 const maxPayloadLength
= 5000
84 pollDelay
:= initPollDelay
85 pollTimer
:= time
.NewTimer(pollDelay
)
87 var payload bytes
.Buffer
88 payload
.Write(c
.clientID
[:])
91 unstash
:= c
.QueuePacketConn
.Unstash(c
.remoteAddr
)
92 outgoing
:= c
.QueuePacketConn
.OutgoingQueue(c
.remoteAddr
)
93 pollTimerExpired
:= false
94 // Block, waiting for one packet or a demand to poll. Prioritize
95 // taking a packet from the stash, then taking one from the
96 // outgoing queue, then finally also consider polls.
108 pollTimerExpired
= true
113 if pollTimerExpired
{
114 // We're polling because it's been a while since we last
115 // polled. Increase the poll delay.
116 pollDelay
= time
.Duration(float64(pollDelay
) * pollDelayMultiplier
)
117 if pollDelay
> maxPollDelay
{
118 pollDelay
= maxPollDelay
121 // We're sending an actual data packet. Reset the poll
123 if !pollTimer
.Stop() {
126 pollDelay
= initPollDelay
128 pollTimer
.Reset(pollDelay
)
130 // Grab as many more packets as are immediately available and
131 // fit in maxPayloadLength. Always include the first packet,
132 // even if it doesn't fit.
134 for len(p
) > 0 && (first || payload
.Len()+len(p
) <= maxPayloadLength
) {
137 // Encapsulate the packet into the payload.
138 encapsulation
.WriteData(&payload
, p
)
147 // We read an actual packet, but it didn't fit under the
148 // limit. Stash it so that it will be first in line for
150 c
.QueuePacketConn
.Stash(p
, c
.remoteAddr
)
154 ctx
, cancel
:= context
.WithTimeout(c
.ctx
, pollTimeout
)
156 body
, err
:= poll(ctx
, payload
.Bytes())
158 log
.Printf("poll: %v", err
)
159 // TODO: perhaps self-throttle when this happens.
163 err
= c
.processIncoming(body
)
165 log
.Printf("processIncoming: %v", err
)
171 // processIncoming reads a packet from a poll response body and feeds it to the
172 // incoming queue of c.QueuePacketConn.
174 // In main, we've done SetACKNoDelay on the *kcp.UDPSession. I expect this will
175 // cause us, the client, to ACK incoming data immediately, which means that
176 // whenever we receive ACKable data, we immediately do another poll (carrying an
177 // ACK), which is what we want anyway while we're actively downloading.
178 func (c
*PollingPacketConn
) processIncoming(body io
.Reader
) error
{
179 // Safety limit on response body length.
180 lr
:= io
.LimitReader(body
, 500*1024)
182 p
, err
:= encapsulation
.ReadData(lr
)
184 if err
== io
.EOF
&& lr
.(*io
.LimitedReader
).N
== 0 {
185 err
= errors
.New("response body too large")
186 } else if err
== io
.EOF
{
192 c
.QueuePacketConn
.QueueIncoming(p
, c
.remoteAddr
)