If you've ever built real-time systems with gRPC streaming, you know how challenging it can be to monitor long-lived connections. Today, I'll show you how to add full observability (logs, metrics, traces) to your gRPC streams using GoFr β with almost zero manual instrumentation!
Why This Matters?
gRPC streaming enables powerful patterns:
- π‘ Real-time dashboards
- π Live data pipelines
- π€ Chatbots and multiplayer games
But without proper observability:
- β You're blind to failing streams
- β Can't track message throughput
- β Hard to debug cross-service issues
GoFr solves this with automatic instrumentation for streaming endpoints!
π οΈ What We'll Build
A chat service demonstrating all 3 streaming types:
- Server Streaming: Push notifications
- Client Streaming: Batch processing
- Bidirectional: Real-time chat
With built-in:
- β Structured Logging
- β Prometheus Format Metrics
- β Tracing
- β Health checks
π§° Prerequisites
- Go 1.20+ installed
- Basic gRPC knowledge
Prerequisites:
# Install GoFr CLI go install gofr.dev/cli/gofr@latest
Step 1: Project Setup
mkdir grpc-streaming-demo cd grpc-streaming-demo go mod init github.com/yourusername/grpc-streaming-demo # Create proto directory mkdir -p server
Step 2: Define Service Contract (server/chat.proto
)
syntax = "proto3"; option go_package = "github.com/yourusername/grpc-streaming-demo/server"; message ChatMessage { string user = 1; string content = 2; } service ChatService { // Server pushes notifications to client rpc Notifications(ChatMessage) returns (stream ChatMessage); // Client uploads message history rpc UploadHistory(stream ChatMessage) returns (ChatMessage); // Real-time bidirectional chat rpc LiveChat(stream ChatMessage) returns (stream ChatMessage); }
Step 3: Generate Boilerplate Code
# Generate protobuf code protoc \ --go_out=. \ --go_opt=paths=source_relative \ --go-grpc_out=. \ --go-grpc_opt=paths=source_relative \ server/chat.proto # Generate GoFr observability wrappers gofr wrap grpc server -proto=server/chat.proto
This creates 6 key files:
File that must not be modified -
-
server/chat_grpc.pb.go
- Standard gRPC code -
server/chat.pb.go
- Standard gRPC request/response type to protobuf conversion -
server/chatservice_gofr.go
- GoFr service scaffolding -
server/health_gofr.go
- Registering default health service on your gRPC server -
server/request_gofr.go
- Handling Request/response through GoFr's context.
Files that can be modified -
-
server/chatservice_server.go
- Your implementation template
Step 4: Implement Service Logic (server/chatservice_server.go
)
// versions: // gofr-cli v0.6.0 // gofr.dev v1.37.0 // source: chat.proto package server import ( "fmt" "io" "time" "gofr.dev/pkg/gofr" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) // Register the gRPC service in your app using the following code in your main.go: // // server.RegisterChatServiceServerWithGofr(app, &server.NewChatServiceGoFrServer()) // // ChatServiceGoFrServer defines the gRPC server implementation. // Customize the struct with required dependencies and fields as needed. type ChatServiceGoFrServer struct { health *healthServer } // Server Streaming: Sends 5 notifications to the client. func (s *ChatServiceGoFrServer) Notifications( ctx *gofr.Context, stream ChatService_NotificationsServer, ) error { req := ChatMessage{} err := ctx.Bind(&req) if err != nil { ctx.Logger.Errorf("Failed to receive request: %v", err) return status.Error(codes.InvalidArgument, "invalid initial request") } ctx.Logger.Infof("Starting notifications for user: %s", req.User) for i := 1; i <= 5; i++ { msg := &ChatMessage{ User: "System", Content: fmt.Sprintf("Notification %d", i), } if err := stream.Send(msg); err != nil { ctx.Logger.Errorf("Stream send error: %v", err) return status.Error(codes.Internal, "streaming failed") } time.Sleep(2 * time.Second) } return nil } // Client Streaming: Accepts multiple messages and returns a summary. func (s *ChatServiceGoFrServer) UploadHistory( ctx *gofr.Context, stream ChatService_UploadHistoryServer, ) error { var count int defer func() { ctx.Logger.Infof("Processed %d historical messages", count) }() for { msg, err := stream.Recv() if err == io.EOF { // Send summary response and close stream. return stream.SendAndClose(&ChatMessage{ User: "System", Content: fmt.Sprintf("Received %d messages", count), }) } if err != nil { ctx.Logger.Errorf("Receive error: %v", err) return status.Errorf(codes.Internal, "failed to receive message: %v", err) } count++ processMessage(msg) } } // Helper: Business logic placeholder. func processMessage(msg *ChatMessage) { // Stub: Log or process each message as needed. // For example: store in DB, filter, etc. } // Bidirectional Streaming: Handles live chat interaction. func (s *ChatServiceGoFrServer) LiveChat( ctx *gofr.Context, stream ChatService_LiveChatServer, ) error { errChan := make(chan error) ctx.Logger.Info("New chat session started") go func() { for { msg, err := stream.Recv() if err == io.EOF { errChan <- nil // Graceful end return } if err != nil { errChan <- status.Errorf(codes.Internal, "receive failed: %v", err) return } ctx.Logger.Infof("%s: %s", msg.User, msg.Content) resp := &ChatMessage{ User: "Server", Content: fmt.Sprintf("[%s] %s", time.Now().Format(time.RFC3339), msg.Content), } if err := stream.Send(resp); err != nil { errChan <- status.Errorf(codes.Internal, "send failed: %v", err) return } } }() // Wait for client disconnect or completion select { case err := <-errChan: return err case <-stream.Context().Done(): ctx.Logger.Warn("Client disconnected unexpectedly") return status.Error(codes.Canceled, "client disconnected") } }
Step 5: Configure Main Application (main.go
)
package main import ( "github.com/yourusername/grpc-streaming-demo/server" "gofr.dev/pkg/gofr" ) func main() { app := gofr.New() // Register service with auto-observability server.RegisterChatServiceServerWithGofr(app, &server.ChatServiceGoFrServer{}) // Start server on port 9000 app.Run() }
Step 6: Run & Test
# Start server go run main.go
Step 7: Test Notification Stream (Server Streaming)
π Observability in Action
1. Logs (structured JSON)
2. Metrics (Prometheus)
3. Traces (GoFr's Open Source Tracer)
Pro Tips π‘
- Add Custom Traces
span := ctx.Trace("some-work") defer span.End()
- Monitor Stream Health with in-built healthchecks
- Alert on Key Metrics
# prometheus-alerts.yml - alert: HighStreamErrors expr: rate(grpc_stream_errors_total[5m]) > 0.1
π Conclusion & Next Steps
Thats it ! You've just built a production-ready gRPC streaming service with:
β
Auto-instrumented observability
β
All 3 streaming types
β
Error handling and metrics
Next Level Ideas:
- Add JWT authentication
- Implement rate limiting
- Build a React frontend
Resources:
This implementation was part of my open source Contribution on GoFr β would love your feedback!
Happy Streaming! π₯
Let me know your use cases in the comments π
Top comments (0)