DEV Community

Divya Darshana
Divya Darshana

Posted on

πŸš€ Building Observable gRPC Streaming Services with GoFr: A Step-by-Step Guide

gRPC + GoFr Observability

If you've ever built real-time systems with gRPC streaming, you know how challenging it can be to monitor long-lived connections. Today, I'll show you how to add full observability (logs, metrics, traces) to your gRPC streams using GoFr – with almost zero manual instrumentation!

Why This Matters?

gRPC streaming enables powerful patterns:

  • πŸ“‘ Real-time dashboards
  • πŸ“ˆ Live data pipelines
  • πŸ€– Chatbots and multiplayer games

But without proper observability:

  • ❌ You're blind to failing streams
  • ❌ Can't track message throughput
  • ❌ Hard to debug cross-service issues

GoFr solves this with automatic instrumentation for streaming endpoints!


πŸ› οΈ What We'll Build

A chat service demonstrating all 3 streaming types:

  1. Server Streaming: Push notifications
  2. Client Streaming: Batch processing
  3. Bidirectional: Real-time chat

With built-in:

  • βœ… Structured Logging
  • βœ… Prometheus Format Metrics
  • βœ… Tracing
  • βœ… Health checks

🧰 Prerequisites

  1. Go 1.20+ installed
  2. Basic gRPC knowledge

Prerequisites:

# Install GoFr CLI go install gofr.dev/cli/gofr@latest 
Enter fullscreen mode Exit fullscreen mode

Step 1: Project Setup

mkdir grpc-streaming-demo cd grpc-streaming-demo go mod init github.com/yourusername/grpc-streaming-demo # Create proto directory mkdir -p server 
Enter fullscreen mode Exit fullscreen mode

Step 2: Define Service Contract (server/chat.proto)

syntax = "proto3"; option go_package = "github.com/yourusername/grpc-streaming-demo/server"; message ChatMessage { string user = 1; string content = 2; } service ChatService { // Server pushes notifications to client rpc Notifications(ChatMessage) returns (stream ChatMessage); // Client uploads message history rpc UploadHistory(stream ChatMessage) returns (ChatMessage); // Real-time bidirectional chat rpc LiveChat(stream ChatMessage) returns (stream ChatMessage); } 
Enter fullscreen mode Exit fullscreen mode

Step 3: Generate Boilerplate Code

# Generate protobuf code protoc \ --go_out=. \ --go_opt=paths=source_relative \ --go-grpc_out=. \ --go-grpc_opt=paths=source_relative \ server/chat.proto # Generate GoFr observability wrappers gofr wrap grpc server -proto=server/chat.proto 
Enter fullscreen mode Exit fullscreen mode

This creates 6 key files:

File that must not be modified -

  1. server/chat_grpc.pb.go - Standard gRPC code
  2. server/chat.pb.go - Standard gRPC request/response type to protobuf conversion
  3. server/chatservice_gofr.go - GoFr service scaffolding
  4. server/health_gofr.go - Registering default health service on your gRPC server
  5. server/request_gofr.go - Handling Request/response through GoFr's context.

Files that can be modified -

  1. server/chatservice_server.go - Your implementation template

Step 4: Implement Service Logic (server/chatservice_server.go)

// versions: // gofr-cli v0.6.0 // gofr.dev v1.37.0 // source: chat.proto package server import ( "fmt" "io" "time" "gofr.dev/pkg/gofr" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) // Register the gRPC service in your app using the following code in your main.go: // // server.RegisterChatServiceServerWithGofr(app, &server.NewChatServiceGoFrServer()) // // ChatServiceGoFrServer defines the gRPC server implementation. // Customize the struct with required dependencies and fields as needed. type ChatServiceGoFrServer struct { health *healthServer } // Server Streaming: Sends 5 notifications to the client. func (s *ChatServiceGoFrServer) Notifications( ctx *gofr.Context, stream ChatService_NotificationsServer, ) error { req := ChatMessage{} err := ctx.Bind(&req) if err != nil { ctx.Logger.Errorf("Failed to receive request: %v", err) return status.Error(codes.InvalidArgument, "invalid initial request") } ctx.Logger.Infof("Starting notifications for user: %s", req.User) for i := 1; i <= 5; i++ { msg := &ChatMessage{ User: "System", Content: fmt.Sprintf("Notification %d", i), } if err := stream.Send(msg); err != nil { ctx.Logger.Errorf("Stream send error: %v", err) return status.Error(codes.Internal, "streaming failed") } time.Sleep(2 * time.Second) } return nil } // Client Streaming: Accepts multiple messages and returns a summary. func (s *ChatServiceGoFrServer) UploadHistory( ctx *gofr.Context, stream ChatService_UploadHistoryServer, ) error { var count int defer func() { ctx.Logger.Infof("Processed %d historical messages", count) }() for { msg, err := stream.Recv() if err == io.EOF { // Send summary response and close stream. return stream.SendAndClose(&ChatMessage{ User: "System", Content: fmt.Sprintf("Received %d messages", count), }) } if err != nil { ctx.Logger.Errorf("Receive error: %v", err) return status.Errorf(codes.Internal, "failed to receive message: %v", err) } count++ processMessage(msg) } } // Helper: Business logic placeholder. func processMessage(msg *ChatMessage) { // Stub: Log or process each message as needed. // For example: store in DB, filter, etc. } // Bidirectional Streaming: Handles live chat interaction. func (s *ChatServiceGoFrServer) LiveChat( ctx *gofr.Context, stream ChatService_LiveChatServer, ) error { errChan := make(chan error) ctx.Logger.Info("New chat session started") go func() { for { msg, err := stream.Recv() if err == io.EOF { errChan <- nil // Graceful end return } if err != nil { errChan <- status.Errorf(codes.Internal, "receive failed: %v", err) return } ctx.Logger.Infof("%s: %s", msg.User, msg.Content) resp := &ChatMessage{ User: "Server", Content: fmt.Sprintf("[%s] %s", time.Now().Format(time.RFC3339), msg.Content), } if err := stream.Send(resp); err != nil { errChan <- status.Errorf(codes.Internal, "send failed: %v", err) return } } }() // Wait for client disconnect or completion select { case err := <-errChan: return err case <-stream.Context().Done(): ctx.Logger.Warn("Client disconnected unexpectedly") return status.Error(codes.Canceled, "client disconnected") } } 
Enter fullscreen mode Exit fullscreen mode

Step 5: Configure Main Application (main.go)

package main import ( "github.com/yourusername/grpc-streaming-demo/server" "gofr.dev/pkg/gofr" ) func main() { app := gofr.New() // Register service with auto-observability server.RegisterChatServiceServerWithGofr(app, &server.ChatServiceGoFrServer{}) // Start server on port 9000 app.Run() } 
Enter fullscreen mode Exit fullscreen mode

Step 6: Run & Test

# Start server go run main.go 
Enter fullscreen mode Exit fullscreen mode

Step 7: Test Notification Stream (Server Streaming)

Image description

πŸ” Observability in Action

1. Logs (structured JSON)

Image description

2. Metrics (Prometheus)

Image description

3. Traces (GoFr's Open Source Tracer)

Image description

Pro Tips πŸ’‘

  1. Add Custom Traces
span := ctx.Trace("some-work") defer span.End() 
Enter fullscreen mode Exit fullscreen mode
  1. Monitor Stream Health with in-built healthchecks

Image description

  1. Alert on Key Metrics
# prometheus-alerts.yml - alert: HighStreamErrors expr: rate(grpc_stream_errors_total[5m]) > 0.1 
Enter fullscreen mode Exit fullscreen mode

πŸŽ‰ Conclusion & Next Steps

Thats it ! You've just built a production-ready gRPC streaming service with:

βœ… Auto-instrumented observability

βœ… All 3 streaming types

βœ… Error handling and metrics

Next Level Ideas:

  • Add JWT authentication
  • Implement rate limiting
  • Build a React frontend

Resources:

This implementation was part of my open source Contribution on GoFr – would love your feedback!

Happy Streaming! πŸŽ₯

Let me know your use cases in the comments πŸ‘‡

Top comments (0)