Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 82 additions & 14 deletions http2/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package http2
import (
"bufio"
"bytes"
"compress/flate"
"compress/gzip"
"context"
"crypto/rand"
Expand Down Expand Up @@ -3076,35 +3077,102 @@ type erringRoundTripper struct{ err error }
func (rt erringRoundTripper) RoundTripErr() error { return rt.err }
func (rt erringRoundTripper) RoundTrip(*http.Request) (*http.Response, error) { return nil, rt.err }

var errConcurrentReadOnResBody = errors.New("http2: concurrent read on response body")

// gzipReader wraps a response body so it can lazily
// call gzip.NewReader on the first call to Read
// get gzip.Reader from the pool on the first call to Read.
// After Close is called it puts gzip.Reader to the pool immediately
// if there is no Read in progress or later when Read completes.
type gzipReader struct {
_ incomparable
body io.ReadCloser // underlying Response.Body
zr *gzip.Reader // lazily-initialized gzip reader
zerr error // sticky error
mu sync.Mutex // guards zr and zerr
zr *gzip.Reader // stores gzip reader from the pool between reads
zerr error // sticky gzip reader init error or sentinel value to detect concurrent read and read after close
}

func (gz *gzipReader) Read(p []byte) (n int, err error) {
type eofReader struct{}

func (eofReader) Read([]byte) (int, error) { return 0, io.EOF }
func (eofReader) ReadByte() (byte, error) { return 0, io.EOF }

var gzipPool = sync.Pool{New: func() any { return new(gzip.Reader) }}

// gzipPoolGet gets a gzip.Reader from the pool and resets it to read from r.
func gzipPoolGet(r io.Reader) (*gzip.Reader, error) {
zr := gzipPool.Get().(*gzip.Reader)
if err := zr.Reset(r); err != nil {
gzipPoolPut(zr)
return nil, err
}
return zr, nil
}

// gzipPoolPut puts a gzip.Reader back into the pool.
func gzipPoolPut(zr *gzip.Reader) {
// Reset will allocate bufio.Reader if we pass it anything
// other than a flate.Reader, so ensure that it's getting one.
var r flate.Reader = eofReader{}
zr.Reset(r)
gzipPool.Put(zr)
}

// acquire returns a gzip.Reader for reading response body.
// The reader must be released after use.
func (gz *gzipReader) acquire() (*gzip.Reader, error) {
gz.mu.Lock()
defer gz.mu.Unlock()
if gz.zerr != nil {
return 0, gz.zerr
return nil, gz.zerr
}
if gz.zr == nil {
gz.zr, err = gzip.NewReader(gz.body)
if err != nil {
gz.zerr = err
return 0, err
gz.zr, gz.zerr = gzipPoolGet(gz.body)
if gz.zerr != nil {
return nil, gz.zerr
}
}
return gz.zr.Read(p)
ret := gz.zr
gz.zr, gz.zerr = nil, errConcurrentReadOnResBody
return ret, nil
}

func (gz *gzipReader) Close() error {
if err := gz.body.Close(); err != nil {
return err
// release returns the gzip.Reader to the pool if Close was called during Read.
func (gz *gzipReader) release(zr *gzip.Reader) {
gz.mu.Lock()
defer gz.mu.Unlock()
if gz.zerr == errConcurrentReadOnResBody {
gz.zr, gz.zerr = zr, nil
} else { // fs.ErrClosed
gzipPoolPut(zr)
}
}

// close returns the gzip.Reader to the pool immediately or
// signals release to do so after Read completes.
func (gz *gzipReader) close() {
gz.mu.Lock()
defer gz.mu.Unlock()
if gz.zerr == nil && gz.zr != nil {
gzipPoolPut(gz.zr)
gz.zr = nil
}
gz.zerr = fs.ErrClosed
return nil
}

func (gz *gzipReader) Read(p []byte) (n int, err error) {
zr, err := gz.acquire()
if err != nil {
return 0, err
}
defer gz.release(zr)

return zr.Read(p)
}

func (gz *gzipReader) Close() error {
gz.close()

return gz.body.Close()
}

type errorReader struct{ err error }
Expand Down
52 changes: 52 additions & 0 deletions http2/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"bytes"
"compress/gzip"
"context"
crand "crypto/rand"
"crypto/tls"
"encoding/hex"
"errors"
Expand Down Expand Up @@ -3866,6 +3867,57 @@ func benchLargeDownloadRoundTrip(b *testing.B, frameSize uint32) {
}
}

func BenchmarkClientGzip(b *testing.B) {
disableGoroutineTracking(b)
b.ReportAllocs()

const responseSize = 1024 * 1024

var buf bytes.Buffer
gz := gzip.NewWriter(&buf)
if _, err := io.CopyN(gz, crand.Reader, responseSize); err != nil {
b.Fatal(err)
}
gz.Close()

data := buf.Bytes()
ts := newTestServer(b,
func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Encoding", "gzip")
w.Write(data)
},
optQuiet,
)

tr := &Transport{TLSClientConfig: tlsConfigInsecure}
defer tr.CloseIdleConnections()

req, err := http.NewRequest("GET", ts.URL, nil)
if err != nil {
b.Fatal(err)
}

b.ResetTimer()

for i := 0; i < b.N; i++ {
res, err := tr.RoundTrip(req)
if err != nil {
b.Fatalf("RoundTrip err = %v; want nil", err)
}
if res.StatusCode != http.StatusOK {
b.Fatalf("Response code = %v; want %v", res.StatusCode, http.StatusOK)
}
n, err := io.Copy(io.Discard, res.Body)
res.Body.Close()
if err != nil {
b.Fatalf("RoundTrip err = %v; want nil", err)
}
if n != responseSize {
b.Fatalf("RoundTrip expected %d bytes, got %d", responseSize, n)
}
}
}

// The client closes the connection just after the server got the client's HEADERS
// frame, but before the server sends its HEADERS response back. The expected
// result is an error on RoundTrip explaining the client closed the connection.
Expand Down