Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,12 @@ import (
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/idle"
iresolver "google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/internal/stats"
istats "google.golang.org/grpc/internal/stats"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"

_ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
Expand Down Expand Up @@ -210,7 +211,8 @@ func NewClient(target string, opts ...DialOption) (conn *ClientConn, err error)
cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelz)
cc.pickerWrapper = newPickerWrapper()

cc.metricsRecorderList = stats.NewMetricsRecorderList(cc.dopts.copts.StatsHandlers)
cc.metricsRecorderList = istats.NewMetricsRecorderList(cc.dopts.copts.StatsHandlers)
cc.statsHandler = istats.NewCombinedHandler(cc.dopts.copts.StatsHandlers...)

cc.initIdleStateLocked() // Safe to call without the lock, since nothing else has a reference to cc.
cc.idlenessMgr = idle.NewManager((*idler)(cc), cc.dopts.idleTimeout)
Expand Down Expand Up @@ -621,7 +623,8 @@ type ClientConn struct {
channelz *channelz.Channel // Channelz object.
resolverBuilder resolver.Builder // See initParsedTargetAndResolverBuilder().
idlenessMgr *idle.Manager
metricsRecorderList *stats.MetricsRecorderList
metricsRecorderList *istats.MetricsRecorderList
statsHandler stats.Handler

// The following provide their own synchronization, and therefore don't
// require cc.mu to be held to access them.
Expand Down
70 changes: 70 additions & 0 deletions internal/stats/stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
*
* Copyright 2025 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package stats

import (
"context"

"google.golang.org/grpc/stats"
)

type combinedHandler struct {
handlers []stats.Handler
}

// NewCombinedHandler combines multiple stats.Handlers into a single handler.
//
// It returns nil if no handlers are provided. If only one handler is
// provided, it is returned directly without wrapping.
func NewCombinedHandler(handlers ...stats.Handler) stats.Handler {
switch len(handlers) {
case 0:
return nil
case 1:
return handlers[0]
default:
return &combinedHandler{handlers: handlers}
}
}

func (ch *combinedHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
for _, h := range ch.handlers {
ctx = h.TagRPC(ctx, info)
}
return ctx
}

func (ch *combinedHandler) HandleRPC(ctx context.Context, stats stats.RPCStats) {
for _, h := range ch.handlers {
h.HandleRPC(ctx, stats)
}
}

func (ch *combinedHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
for _, h := range ch.handlers {
ctx = h.TagConn(ctx, info)
}
return ctx
}

func (ch *combinedHandler) HandleConn(ctx context.Context, stats stats.ConnStats) {
for _, h := range ch.handlers {
h.HandleConn(ctx, stats)
}
}
30 changes: 13 additions & 17 deletions internal/transport/handler_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import (
// NewServerHandlerTransport returns a ServerTransport handling gRPC from
// inside an http.Handler, or writes an HTTP error to w and returns an error.
// It requires that the http Server supports HTTP/2.
func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []stats.Handler, bufferPool mem.BufferPool) (ServerTransport, error) {
func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats stats.Handler, bufferPool mem.BufferPool) (ServerTransport, error) {
if r.Method != http.MethodPost {
w.Header().Set("Allow", http.MethodPost)
msg := fmt.Sprintf("invalid gRPC request method %q", r.Method)
Expand Down Expand Up @@ -170,7 +170,7 @@ type serverHandlerTransport struct {
// TODO make sure this is consistent across handler_server and http2_server
contentSubtype string

stats []stats.Handler
stats stats.Handler
logger *grpclog.PrefixLogger

bufferPool mem.BufferPool
Expand Down Expand Up @@ -274,15 +274,13 @@ func (ht *serverHandlerTransport) writeStatus(s *ServerStream, st *status.Status
}
})

if err == nil { // transport has not been closed
if err == nil && ht.stats != nil { // transport has not been closed
// Note: The trailer fields are compressed with hpack after this call returns.
// No WireLength field is set here.
s.hdrMu.Lock()
for _, sh := range ht.stats {
sh.HandleRPC(s.Context(), &stats.OutTrailer{
Trailer: s.trailer.Copy(),
})
}
ht.stats.HandleRPC(s.Context(), &stats.OutTrailer{
Trailer: s.trailer.Copy(),
})
s.hdrMu.Unlock()
}
ht.Close(errors.New("finished writing status"))
Expand Down Expand Up @@ -374,15 +372,13 @@ func (ht *serverHandlerTransport) writeHeader(s *ServerStream, md metadata.MD) e
ht.rw.(http.Flusher).Flush()
})

if err == nil {
for _, sh := range ht.stats {
// Note: The header fields are compressed with hpack after this call returns.
// No WireLength field is set here.
sh.HandleRPC(s.Context(), &stats.OutHeader{
Header: md.Copy(),
Compression: s.sendCompress,
})
}
if err == nil && ht.stats != nil {
// Note: The header fields are compressed with hpack after this call returns.
// No WireLength field is set here.
ht.stats.HandleRPC(s.Context(), &stats.OutHeader{
Header: md.Copy(),
Compression: s.sendCompress,
})
}
return err
}
Expand Down
6 changes: 3 additions & 3 deletions internal/transport/handler_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func (h *mockStatsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) co
func (h *mockStatsHandler) HandleConn(context.Context, stats.ConnStats) {
}

func newHandleStreamTest(t *testing.T, statsHandlers []stats.Handler) *handleStreamTest {
func newHandleStreamTest(t *testing.T, statsHandler stats.Handler) *handleStreamTest {
bodyr, bodyw := io.Pipe()
req := &http.Request{
ProtoMajor: 2,
Expand All @@ -280,7 +280,7 @@ func newHandleStreamTest(t *testing.T, statsHandlers []stats.Handler) *handleStr
Body: bodyr,
}
rw := newTestHandlerResponseWriter().(testHandlerResponseWriter)
ht, err := NewServerHandlerTransport(rw, req, statsHandlers, mem.DefaultBufferPool())
ht, err := NewServerHandlerTransport(rw, req, statsHandler, mem.DefaultBufferPool())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -555,7 +555,7 @@ func (s) TestHandlerTransport_HandleStreams_StatsHandlers(t *testing.T) {
statsHandler := &mockStatsHandler{
rpcStatsCh: make(chan stats.RPCStats, 2),
}
hst := newHandleStreamTest(t, []stats.Handler{statsHandler})
hst := newHandleStreamTest(t, statsHandler)
handleStream := func(s *ServerStream) {
if err := s.SendHeader(metadata.New(map[string]string{})); err != nil {
t.Error(err)
Expand Down
59 changes: 26 additions & 33 deletions internal/transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"google.golang.org/grpc/internal/grpcutil"
imetadata "google.golang.org/grpc/internal/metadata"
"google.golang.org/grpc/internal/proxyattributes"
istats "google.golang.org/grpc/internal/stats"
istatus "google.golang.org/grpc/internal/status"
isyscall "google.golang.org/grpc/internal/syscall"
"google.golang.org/grpc/internal/transport/networktype"
Expand Down Expand Up @@ -105,7 +106,7 @@ type http2Client struct {
kp keepalive.ClientParameters
keepaliveEnabled bool

statsHandlers []stats.Handler
statsHandler stats.Handler

initialWindowSize int32

Expand Down Expand Up @@ -342,7 +343,7 @@ func NewHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
isSecure: isSecure,
perRPCCreds: perRPCCreds,
kp: kp,
statsHandlers: opts.StatsHandlers,
statsHandler: istats.NewCombinedHandler(opts.StatsHandlers...),
initialWindowSize: initialWindowSize,
nextID: 1,
maxConcurrentStreams: defaultMaxStreamsClient,
Expand Down Expand Up @@ -386,15 +387,14 @@ func NewHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
updateFlowControl: t.updateFlowControl,
}
}
for _, sh := range t.statsHandlers {
t.ctx = sh.TagConn(t.ctx, &stats.ConnTagInfo{
if t.statsHandler != nil {
t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{
RemoteAddr: t.remoteAddr,
LocalAddr: t.localAddr,
})
connBegin := &stats.ConnBegin{
t.statsHandler.HandleConn(t.ctx, &stats.ConnBegin{
Client: true,
}
sh.HandleConn(t.ctx, connBegin)
})
}
if t.keepaliveEnabled {
t.kpDormancyCond = sync.NewCond(&t.mu)
Expand Down Expand Up @@ -905,27 +905,23 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*ClientS
return nil, &NewStreamError{Err: ErrConnClosing, AllowTransparentRetry: true}
}
}
if len(t.statsHandlers) != 0 {
if t.statsHandler != nil {
header, ok := metadata.FromOutgoingContext(ctx)
if ok {
header.Set("user-agent", t.userAgent)
} else {
header = metadata.Pairs("user-agent", t.userAgent)
}
for _, sh := range t.statsHandlers {
// Note: The header fields are compressed with hpack after this call returns.
// No WireLength field is set here.
// Note: Creating a new stats object to prevent pollution.
outHeader := &stats.OutHeader{
Client: true,
FullMethod: callHdr.Method,
RemoteAddr: t.remoteAddr,
LocalAddr: t.localAddr,
Compression: callHdr.SendCompress,
Header: header,
}
sh.HandleRPC(s.ctx, outHeader)
}
// Note: The header fields are compressed with hpack after this call returns.
// No WireLength field is set here.
t.statsHandler.HandleRPC(s.ctx, &stats.OutHeader{
Client: true,
FullMethod: callHdr.Method,
RemoteAddr: t.remoteAddr,
LocalAddr: t.localAddr,
Compression: callHdr.SendCompress,
Header: header,
})
}
if transportDrainRequired {
if t.logger.V(logLevel) {
Expand Down Expand Up @@ -1066,11 +1062,10 @@ func (t *http2Client) Close(err error) {
for _, s := range streams {
t.closeStream(s, err, false, http2.ErrCodeNo, st, nil, false)
}
for _, sh := range t.statsHandlers {
connEnd := &stats.ConnEnd{
if t.statsHandler != nil {
t.statsHandler.HandleConn(t.ctx, &stats.ConnEnd{
Client: true,
}
sh.HandleConn(t.ctx, connEnd)
})
}
}

Expand Down Expand Up @@ -1602,22 +1597,20 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
}
}

for _, sh := range t.statsHandlers {
if t.statsHandler != nil {
if !endStream {
inHeader := &stats.InHeader{
t.statsHandler.HandleRPC(s.ctx, &stats.InHeader{
Client: true,
WireLength: int(frame.Header().Length),
Header: metadata.MD(mdata).Copy(),
Compression: s.recvCompress,
}
sh.HandleRPC(s.ctx, inHeader)
})
} else {
inTrailer := &stats.InTrailer{
t.statsHandler.HandleRPC(s.ctx, &stats.InTrailer{
Client: true,
WireLength: int(frame.Header().Length),
Trailer: metadata.MD(mdata).Copy(),
}
sh.HandleRPC(s.ctx, inTrailer)
})
}
}

Expand Down
15 changes: 7 additions & 8 deletions internal/transport/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type http2Server struct {
// updates, reset streams, and various settings) to the controller.
controlBuf *controlBuffer
fc *trInFlow
stats []stats.Handler
stats stats.Handler
// Keepalive and max-age parameters for the server.
kp keepalive.ServerParameters
// Keepalive enforcement policy.
Expand Down Expand Up @@ -261,7 +261,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
fc: &trInFlow{limit: uint32(icwz)},
state: reachable,
activeStreams: make(map[uint32]*ServerStream),
stats: config.StatsHandlers,
stats: config.StatsHandler,
kp: kp,
idle: time.Now(),
kep: kep,
Expand Down Expand Up @@ -1059,14 +1059,13 @@ func (t *http2Server) writeHeaderLocked(s *ServerStream) error {
t.closeStream(s, true, http2.ErrCodeInternal, false)
return ErrHeaderListSizeLimitViolation
}
for _, sh := range t.stats {
if t.stats != nil {
// Note: Headers are compressed with hpack after this call returns.
// No WireLength field is set here.
outHeader := &stats.OutHeader{
t.stats.HandleRPC(s.Context(), &stats.OutHeader{
Header: s.header.Copy(),
Compression: s.sendCompress,
}
sh.HandleRPC(s.Context(), outHeader)
})
}
return nil
}
Expand Down Expand Up @@ -1134,10 +1133,10 @@ func (t *http2Server) writeStatus(s *ServerStream, st *status.Status) error {
// Send a RST_STREAM after the trailers if the client has not already half-closed.
rst := s.getState() == streamActive
t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
for _, sh := range t.stats {
if t.stats != nil {
// Note: The trailer fields are compressed with hpack after this call returns.
// No WireLength field is set here.
sh.HandleRPC(s.Context(), &stats.OutTrailer{
t.stats.HandleRPC(s.Context(), &stats.OutTrailer{
Trailer: s.trailer.Copy(),
})
}
Expand Down
2 changes: 1 addition & 1 deletion internal/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ type ServerConfig struct {
ConnectionTimeout time.Duration
Credentials credentials.TransportCredentials
InTapHandle tap.ServerInHandle
StatsHandlers []stats.Handler
StatsHandler stats.Handler
KeepaliveParams keepalive.ServerParameters
KeepalivePolicy keepalive.EnforcementPolicy
InitialWindowSize int32
Expand Down
Loading