Remove an obsolete comment.
[champa.git] / champa-client / pollingpacketconn.go
blobeee4b2aaa8fc4f95b645a5586845916424a5ff3c
1 package main
3 import (
4 "bytes"
5 "context"
6 "errors"
7 "io"
8 "log"
9 "net"
10 "time"
12 "www.bamsoftware.com/git/champa.git/encapsulation"
13 "www.bamsoftware.com/git/champa.git/turbotunnel"
16 const (
17 // pollLoop has a poll timer that automatically sends an empty polling
18 // query when a certain amount of time has elapsed without a send. The
19 // poll timer is initially set to initPollDelay. It increases by a
20 // factor of pollDelayMultiplier every time the poll timer expires, up
21 // to a maximum of maxPollDelay. The poll timer is reset to
22 // initPollDelay whenever an a send occurs that is not the result of the
23 // poll timer expiring.
24 initPollDelay = 1 * time.Second
25 maxPollDelay = 10 * time.Second
26 pollDelayMultiplier = 2.0
28 // How long we wait for a start-to-finish request–response exchange,
29 // including reading the response body.
30 pollTimeout = 30 * time.Second
33 // PollingPacketConn implements the net.PacketConn interface over an abstract
34 // poll function. Packets addressed to remoteAddr are passed to WriteTo are
35 // batched, encapsulated, and passed to the poll function. Packets addressed to
36 // other remote addresses are ignored. The poll function returns its own batch
37 // of incoming packets which are queued to be returned from a future call to
38 // ReadFrom.
39 type PollingPacketConn struct {
40 remoteAddr net.Addr
41 clientID turbotunnel.ClientID
42 ctx context.Context
43 cancel context.CancelFunc
44 // QueuePacketConn is the direct receiver of ReadFrom and WriteTo calls.
45 // sendLoop removes messages from the outgoing queue that were placed
46 // there by WriteTo, and inserts messages into the incoming queue to be
47 // returned from ReadFrom.
48 *turbotunnel.QueuePacketConn
51 type PollFunc func(context.Context, []byte) (io.ReadCloser, error)
53 func NewPollingPacketConn(remoteAddr net.Addr, poll PollFunc) *PollingPacketConn {
54 clientID := turbotunnel.NewClientID()
55 ctx, cancel := context.WithCancel(context.Background())
56 c := &PollingPacketConn{
57 remoteAddr: remoteAddr,
58 clientID: clientID,
59 ctx: ctx,
60 cancel: cancel,
61 QueuePacketConn: turbotunnel.NewQueuePacketConn(clientID, 0),
63 go func() {
64 err := c.pollLoop(poll)
65 if err != nil {
66 log.Printf("pollLoop: %v", err)
68 }()
69 return c
72 // Close cancels any in-progress polls and closes the underlying
73 // QueuePacketConn.
74 func (c *PollingPacketConn) Close() error {
75 c.cancel()
76 return c.QueuePacketConn.Close()
79 func (c *PollingPacketConn) pollLoop(poll PollFunc) error {
80 // TODO: compute this dynamically, considering URL length and encoding
81 // overhead.
82 const maxPayloadLength = 5000
84 pollDelay := initPollDelay
85 pollTimer := time.NewTimer(pollDelay)
86 for {
87 var payload bytes.Buffer
88 payload.Write(c.clientID[:])
90 var p []byte
91 unstash := c.QueuePacketConn.Unstash(c.remoteAddr)
92 outgoing := c.QueuePacketConn.OutgoingQueue(c.remoteAddr)
93 pollTimerExpired := false
94 // Block, waiting for one packet or a demand to poll. Prioritize
95 // taking a packet from the stash, then taking one from the
96 // outgoing queue, then finally also consider polls.
97 select {
98 case p = <-unstash:
99 default:
100 select {
101 case p = <-unstash:
102 case p = <-outgoing:
103 default:
104 select {
105 case p = <-unstash:
106 case p = <-outgoing:
107 case <-pollTimer.C:
108 pollTimerExpired = true
113 if pollTimerExpired {
114 // We're polling because it's been a while since we last
115 // polled. Increase the poll delay.
116 pollDelay = time.Duration(float64(pollDelay) * pollDelayMultiplier)
117 if pollDelay > maxPollDelay {
118 pollDelay = maxPollDelay
120 } else {
121 // We're sending an actual data packet. Reset the poll
122 // delay to initial.
123 if !pollTimer.Stop() {
124 <-pollTimer.C
126 pollDelay = initPollDelay
128 pollTimer.Reset(pollDelay)
130 // Grab as many more packets as are immediately available and
131 // fit in maxPayloadLength. Always include the first packet,
132 // even if it doesn't fit.
133 first := true
134 for len(p) > 0 && (first || payload.Len()+len(p) <= maxPayloadLength) {
135 first = false
137 // Encapsulate the packet into the payload.
138 encapsulation.WriteData(&payload, p)
140 select {
141 case p = <-outgoing:
142 default:
143 p = nil
146 if len(p) > 0 {
147 // We read an actual packet, but it didn't fit under the
148 // limit. Stash it so that it will be first in line for
149 // the next poll.
150 c.QueuePacketConn.Stash(p, c.remoteAddr)
153 go func() {
154 ctx, cancel := context.WithTimeout(c.ctx, pollTimeout)
155 defer cancel()
156 body, err := poll(ctx, payload.Bytes())
157 if err != nil {
158 log.Printf("poll: %v", err)
159 // TODO: perhaps self-throttle when this happens.
160 return
162 defer body.Close()
163 err = c.processIncoming(body)
164 if err != nil {
165 log.Printf("processIncoming: %v", err)
171 // processIncoming reads a packet from a poll response body and feeds it to the
172 // incoming queue of c.QueuePacketConn.
174 // In main, we've done SetACKNoDelay on the *kcp.UDPSession. I expect this will
175 // cause us, the client, to ACK incoming data immediately, which means that
176 // whenever we receive ACKable data, we immediately do another poll (carrying an
177 // ACK), which is what we want anyway while we're actively downloading.
178 func (c *PollingPacketConn) processIncoming(body io.Reader) error {
179 // Safety limit on response body length.
180 lr := io.LimitReader(body, 500*1024)
181 for {
182 p, err := encapsulation.ReadData(lr)
183 if err != nil {
184 if err == io.EOF && lr.(*io.LimitedReader).N == 0 {
185 err = errors.New("response body too large")
186 } else if err == io.EOF {
187 err = nil
189 return err
192 c.QueuePacketConn.QueueIncoming(p, c.remoteAddr)