Ignore ErrClosedPipe in "copy stream←local/upstream" as well.
[champa.git] / champa-client / main.go
blobd32378c33908763e12596e7140968a96c66e9968
1 package main
3 import (
4 "context"
5 "flag"
6 "fmt"
7 "io"
8 "log"
9 "net"
10 "net/http"
11 "net/url"
12 "os"
13 "sync"
14 "time"
16 "github.com/xtaci/kcp-go/v5"
17 "github.com/xtaci/smux"
18 "www.bamsoftware.com/git/champa.git/noise"
19 "www.bamsoftware.com/git/champa.git/turbotunnel"
22 // smux streams will be closed after this much time without receiving data.
23 const idleTimeout = 2 * time.Minute
25 // readKeyFromFile reads a key from a named file.
26 func readKeyFromFile(filename string) ([]byte, error) {
27 f, err := os.Open(filename)
28 if err != nil {
29 return nil, err
31 defer f.Close()
32 return noise.ReadKey(f)
35 func handle(local *net.TCPConn, sess *smux.Session, conv uint32) error {
36 stream, err := sess.OpenStream()
37 if err != nil {
38 return fmt.Errorf("session %08x opening stream: %v", conv, err)
40 defer func() {
41 log.Printf("end stream %08x:%d", conv, stream.ID())
42 stream.Close()
43 }()
44 log.Printf("begin stream %08x:%d", conv, stream.ID())
46 var wg sync.WaitGroup
47 wg.Add(2)
48 go func() {
49 defer wg.Done()
50 _, err := io.Copy(stream, local)
51 if err == io.EOF {
52 // smux Stream.Write may return io.EOF.
53 err = nil
55 if err != nil && err != io.ErrClosedPipe {
56 log.Printf("stream %08x:%d copy stream←local: %v", conv, stream.ID(), err)
58 local.CloseRead()
59 stream.Close()
60 }()
61 go func() {
62 defer wg.Done()
63 _, err := io.Copy(local, stream)
64 if err == io.EOF {
65 // smux Stream.WriteTo may return io.EOF.
66 err = nil
68 if err != nil && err != io.ErrClosedPipe {
69 log.Printf("stream %08x:%d copy local←stream: %v", conv, stream.ID(), err)
71 local.CloseWrite()
72 }()
73 wg.Wait()
75 return err
78 func run(serverURL, cacheURL *url.URL, front, localAddr string, pubkey []byte) error {
79 ln, err := net.Listen("tcp", localAddr)
80 if err != nil {
81 return err
83 defer ln.Close()
85 http.DefaultTransport.(*http.Transport).MaxConnsPerHost = 20
87 var poll PollFunc = func(ctx context.Context, p []byte) (io.ReadCloser, error) {
88 return exchangeAMP(ctx, serverURL, cacheURL, front, p)
90 pconn := NewPollingPacketConn(turbotunnel.DummyAddr{}, poll)
91 defer pconn.Close()
93 // Open a KCP conn on the PacketConn.
94 conn, err := kcp.NewConn2(turbotunnel.DummyAddr{}, nil, 0, 0, pconn)
95 if err != nil {
96 return fmt.Errorf("opening KCP conn: %v", err)
98 defer func() {
99 log.Printf("end session %08x", conn.GetConv())
100 conn.Close()
102 log.Printf("begin session %08x", conn.GetConv())
103 // Permit coalescing the payloads of consecutive sends.
104 conn.SetStreamMode(true)
105 // Disable the dynamic congestion window (limit only by the maximum of
106 // local and remote static windows).
107 conn.SetNoDelay(
108 0, // default nodelay
109 0, // default interval
110 0, // default resend
111 1, // nc=1 => congestion window off
113 // ACK received data immediately; this is good in our polling model.
114 conn.SetACKNoDelay(true)
115 conn.SetWindowSize(1024, 1024) // Default is 32, 32.
116 // TODO: We could optimize a call to conn.SetMtu here, based on a
117 // maximum URL length we want to send (such as the 8000 bytes
118 // recommended at https://datatracker.ietf.org/doc/html/rfc7230#section-3.1.1).
119 // The idea is that if we can slightly reduce the MTU from its default
120 // to permit one more packet per request, we should do it.
121 // E.g. 1400*5 = 7000, but 1320*6 = 7920.
123 // Put a Noise channel on top of the KCP conn.
124 rw, err := noise.NewClient(conn, pubkey)
125 if err != nil {
126 return err
129 // Start a smux session on the Noise channel.
130 smuxConfig := smux.DefaultConfig()
131 smuxConfig.Version = 2
132 smuxConfig.KeepAliveTimeout = idleTimeout
133 smuxConfig.MaxReceiveBuffer = 4 * 1024 * 1024 // default is 4 * 1024 * 1024
134 smuxConfig.MaxStreamBuffer = 1 * 1024 * 1024 // default is 65536
135 sess, err := smux.Client(rw, smuxConfig)
136 if err != nil {
137 return fmt.Errorf("opening smux session: %v", err)
139 defer sess.Close()
141 for {
142 local, err := ln.Accept()
143 if err != nil {
144 if err, ok := err.(net.Error); ok && err.Temporary() {
145 continue
147 return err
149 go func() {
150 defer local.Close()
151 err := handle(local.(*net.TCPConn), sess, conn.GetConv())
152 if err != nil {
153 log.Printf("handle: %v", err)
159 func main() {
160 var cache string
161 var front string
162 var pubkeyFilename string
163 var pubkeyString string
165 flag.Usage = func() {
166 fmt.Fprintf(flag.CommandLine.Output(), `Usage:
167 %[1]s [-cache CACHEURL] [-front DOMAIN] SERVERURL LOCALADDR
169 Example:
170 %[1]s -cache https://amp.cache.example/ -front amp.cache.example https://server.example/champa/ 127.0.0.1:7000
172 `, os.Args[0])
173 flag.PrintDefaults()
175 flag.StringVar(&cache, "cache", "", "URL of AMP cache (try https://cdn.ampproject.org/)")
176 flag.StringVar(&front, "front", "", "domain to domain-front HTTPS requests with (try www.google.com)")
177 flag.StringVar(&pubkeyString, "pubkey", "", fmt.Sprintf("server public key (%d hex digits)", noise.KeyLen*2))
178 flag.StringVar(&pubkeyFilename, "pubkey-file", "", "read server public key from file")
179 flag.Parse()
181 log.SetFlags(log.LstdFlags | log.LUTC)
183 if flag.NArg() != 2 {
184 flag.Usage()
185 os.Exit(1)
187 serverURL, err := url.Parse(flag.Arg(0))
188 if err != nil {
189 fmt.Fprintf(os.Stderr, "cannot parse server URL: %v\n", err)
190 os.Exit(1)
192 localAddr := flag.Arg(1)
194 var cacheURL *url.URL
195 if cache != "" {
196 cacheURL, err = url.Parse(cache)
197 if err != nil {
198 fmt.Fprintf(os.Stderr, "cannot parse AMP cache URL: %v\n", err)
199 os.Exit(1)
203 var pubkey []byte
204 if pubkeyFilename != "" && pubkeyString != "" {
205 fmt.Fprintf(os.Stderr, "only one of -pubkey and -pubkey-file may be used\n")
206 os.Exit(1)
207 } else if pubkeyFilename != "" {
208 var err error
209 pubkey, err = readKeyFromFile(pubkeyFilename)
210 if err != nil {
211 fmt.Fprintf(os.Stderr, "cannot read pubkey from file: %v\n", err)
212 os.Exit(1)
214 } else if pubkeyString != "" {
215 var err error
216 pubkey, err = noise.DecodeKey(pubkeyString)
217 if err != nil {
218 fmt.Fprintf(os.Stderr, "pubkey format error: %v\n", err)
219 os.Exit(1)
221 } else {
222 fmt.Fprintf(os.Stderr, "the -pubkey or -pubkey-file option is required\n")
223 os.Exit(1)
226 err = run(serverURL, cacheURL, front, localAddr, pubkey)
227 if err != nil {
228 log.Fatal(err)