DEV Community

Cover image for 🧩 Minha Primeira Comunicação com MCP e .NET – Parte 4
Danilo O. Pinheiro, dopme.io
Danilo O. Pinheiro, dopme.io

Posted on

🧩 Minha Primeira Comunicação com MCP e .NET – Parte 4

Integração Completa com gRPC & WebSocket

Nesta quarta parte da série "Minha Primeira Comunicação com MCP e .NET", exploramos como criar uma arquitetura híbrida que combina o melhor dos dois mundos: a eficiência e tipagem forte do gRPC para comunicação interna entre serviços, e a flexibilidade e suporte nativo do WebSocket para clientes web e mobile, criando um ecossistema MCP robusto e escalável.


🚀 Introdução

Após explorarmos gRPC (Parte 2) e WebSocket (Parte 3) individualmente, surge uma questão arquitetural importante: e se precisarmos dos benefícios de ambos?

Em sistemas corporativos modernos, é comum ter:

  • Microsserviços backend que se comunicam via gRPC (baixa latência, tipagem forte)
  • Clientes web/mobile que precisam de WebSocket (suporte nativo, real-time)
  • Agentes MCP que podem usar ambos os protocolos conforme o contexto

Este artigo demonstra como construir uma arquitetura unificada que oferece ambos os protocolos, com roteamento inteligente, adaptadores de protocolo, e observabilidade centralizada.


🧠 Arquitetura Híbrida: O Melhor dos Dois Mundos

Visão Geral da Arquitetura

┌─────────────────────────────────────────────────────────────────┐ │ API Gateway / BFF │ │ (Protocol Orchestrator) │ └──────────────────┬─────────────────────┬────────────────────────┘ │ │ ┌─────────▼──────────┐ ┌───────▼──────────┐ │ gRPC Endpoint │ │ WebSocket Endpoint│ │ (Port 5001) │ │ (Port 5002) │ └─────────┬──────────┘ └───────┬───────────┘ │ │ │ ┌────────────────┴─────────────────┐ │ │ │ ┌─────────▼────▼──────────────┐ ┌─────────────▼──────┐ │ Protocol Adapter Layer │ │ SignalR Hub Layer │ │ (gRPC ↔ WebSocket) │ │ (Client Manager) │ └─────────┬──────────────────┘ └─────────┬────────────┘ │ │ ┌─────────▼──────────────────────────────────▼────────┐ │ MCP Kernel Orchestrator │ │ (Unified Command Processing) │ └─────────┬──────────────────────────┬─────────────────┘ │ │ ┌─────────▼────────┐ ┌─────────▼──────────┐ │ Domain Services │ │ Event Bus (Redis) │ │ (Business Logic)│ │ (Cross-Protocol) │ └──────────────────┘ └────────────────────┘ 
Enter fullscreen mode Exit fullscreen mode

Componentes Principais

  1. Protocol Orchestrator - Decide qual protocolo usar baseado em contexto
  2. Protocol Adapter Layer - Converte mensagens entre gRPC e WebSocket
  3. Unified MCP Kernel - Processa comandos independente do protocolo
  4. Event Bus - Sincroniza eventos entre instâncias e protocolos
  5. Observability Layer - Tracing unificado para ambos os protocolos

🏗️ Implementação da Arquitetura Híbrida

1️⃣ Estrutura do Projeto

MCPPipeline.Hybrid/ ├── src/ │ ├── MCPPipeline.Contracts/ # Modelos compartilhados │ │ ├── Messages/ │ │ │ ├── MCPCommand.cs │ │ │ ├── MCPResponse.cs │ │ │ └── MCPEvent.cs │ │ └── Protos/ │ │ └── mcp.proto │ │ │ ├── MCPPipeline.Core/ # Lógica de negócio │ │ ├── Services/ │ │ │ ├── IMCPKernelService.cs │ │ │ └── MCPKernelService.cs │ │ └── Events/ │ │ └── IEventBus.cs │ │ │ ├── MCPPipeline.GrpcService/ # Endpoint gRPC │ │ ├── Services/ │ │ │ └── MCPGrpcService.cs │ │ └── Program.cs │ │ │ ├── MCPPipeline.WebSocketService/ # Endpoint WebSocket │ │ ├── Hubs/ │ │ │ └── MCPHub.cs │ │ └── Program.cs │ │ │ ├── MCPPipeline.Gateway/ # API Gateway (YARP) │ │ ├── Configuration/ │ │ └── Program.cs │ │ │ └── MCPPipeline.Adapter/ # Protocol Adapter │ ├── GrpcToWebSocketAdapter.cs │ └── WebSocketToGrpcAdapter.cs │ └── tests/ └── MCPPipeline.IntegrationTests/ 
Enter fullscreen mode Exit fullscreen mode

2️⃣ Contratos Compartilhados

// MCPPipeline.Contracts/Messages/MCPCommand.cs namespace MCPPipeline.Contracts.Messages; public record MCPCommand { public string CommandId { get; init; } = Guid.NewGuid().ToString(); public string Command { get; init; } = string.Empty; public string Payload { get; init; } = string.Empty; public Dictionary<string, string> Metadata { get; init; } = new(); public DateTime Timestamp { get; init; } = DateTime.UtcNow; public string SessionId { get; init; } = string.Empty; public ProtocolType Protocol { get; init; } public int Priority { get; init; } = 0; } public record MCPResponse { public string CommandId { get; init; } = string.Empty; public string Result { get; init; } = string.Empty; public ResponseStatus Status { get; init; } public string? ErrorMessage { get; init; } public long ProcessingTimeMs { get; init; } public DateTime Timestamp { get; init; } = DateTime.UtcNow; public Dictionary<string, object> Metrics { get; init; } = new(); } public record MCPEvent { public string EventId { get; init; } = Guid.NewGuid().ToString(); public string EventType { get; init; } = string.Empty; public string Source { get; init; } = string.Empty; public object Data { get; init; } = new(); public DateTime Timestamp { get; init; } = DateTime.UtcNow; } public enum ProtocolType { Unknown = 0, Grpc = 1, WebSocket = 2, Http = 3 } public enum ResponseStatus { Success, Error, Processing, Timeout } 
Enter fullscreen mode Exit fullscreen mode

3️⃣ MCP Kernel Unificado

// MCPPipeline.Core/Services/MCPKernelService.cs using MCPPipeline.Contracts.Messages; using System.Diagnostics; namespace MCPPipeline.Core.Services; public interface IMCPKernelService { Task<MCPResponse> ExecuteCommandAsync(MCPCommand command, CancellationToken ct); IAsyncEnumerable<string> StreamCommandAsync(MCPCommand command, CancellationToken ct); } public class MCPKernelService : IMCPKernelService { private readonly ILogger<MCPKernelService> _logger; private readonly IEventBus _eventBus; private static readonly ActivitySource ActivitySource = new("MCPPipeline.Core"); public MCPKernelService( ILogger<MCPKernelService> logger, IEventBus eventBus) { _logger = logger; _eventBus = eventBus; } public async Task<MCPResponse> ExecuteCommandAsync( MCPCommand command, CancellationToken ct) { using var activity = ActivitySource.StartActivity("ExecuteCommand"); activity?.SetTag("command.id", command.CommandId); activity?.SetTag("command.type", command.Command); activity?.SetTag("protocol", command.Protocol.ToString()); var sw = Stopwatch.StartNew(); try { _logger.LogInformation( "Executando comando {Command} via {Protocol} | Session: {SessionId}", command.Command, command.Protocol, command.SessionId); // Publicar evento de início await _eventBus.PublishAsync(new MCPEvent { EventType = "CommandStarted", Source = command.Protocol.ToString(), Data = new { command.CommandId, command.Command } }); // Processar comando var result = await ProcessCommandAsync(command, ct); sw.Stop(); var response = new MCPResponse { CommandId = command.CommandId, Result = result, Status = ResponseStatus.Success, ProcessingTimeMs = sw.ElapsedMilliseconds, Metrics = new Dictionary<string, object> { ["protocol"] = command.Protocol.ToString(), ["priority"] = command.Priority } }; // Publicar evento de conclusão await _eventBus.PublishAsync(new MCPEvent { EventType = "CommandCompleted", Source = command.Protocol.ToString(), Data = new { command.CommandId, response.ProcessingTimeMs } }); activity?.SetTag("response.status", "success"); return response; } catch (Exception ex) { sw.Stop(); _logger.LogError(ex, "Erro ao executar comando {Command} via {Protocol}", command.Command, command.Protocol); activity?.SetTag("response.status", "error"); activity?.SetTag("error.message", ex.Message); await _eventBus.PublishAsync(new MCPEvent { EventType = "CommandFailed", Source = command.Protocol.ToString(), Data = new { command.CommandId, Error = ex.Message } }); return new MCPResponse { CommandId = command.CommandId, Status = ResponseStatus.Error, ErrorMessage = ex.Message, ProcessingTimeMs = sw.ElapsedMilliseconds }; } } public async IAsyncEnumerable<string> StreamCommandAsync( MCPCommand command, [EnumeratorCancellation] CancellationToken ct) { using var activity = ActivitySource.StartActivity("StreamCommand"); activity?.SetTag("command.id", command.CommandId); _logger.LogInformation( "Iniciando streaming para comando {Command} via {Protocol}", command.Command, command.Protocol); var chunks = await GenerateStreamChunksAsync(command.Payload, ct); for (int i = 0; i < chunks.Count; i++) { if (ct.IsCancellationRequested) yield break; yield return chunks[i]; await Task.Delay(50, ct); // Simula processamento } } private async Task<string> ProcessCommandAsync(MCPCommand command, CancellationToken ct) { return command.Command.ToLowerInvariant() switch { "analyze" => await AnalyzeAsync(command.Payload, ct), "summarize" => await SummarizeAsync(command.Payload, ct), "translate" => await TranslateAsync(command.Payload, ct), "generate" => await GenerateAsync(command.Payload, ct), "status" => await GetStatusAsync(ct), _ => throw new InvalidOperationException($"Comando desconhecido: {command.Command}") }; } private async Task<string> AnalyzeAsync(string payload, CancellationToken ct) { await Task.Delay(200, ct); var words = payload.Split(' ', StringSplitOptions.RemoveEmptyEntries).Length; return $"Análise: {words} palavras, {payload.Length} caracteres"; } private async Task<string> SummarizeAsync(string payload, CancellationToken ct) { await Task.Delay(300, ct); return payload.Length > 100 ? $"Resumo: {payload[..100]}..." : $"Resumo: {payload}"; } private async Task<string> TranslateAsync(string payload, CancellationToken ct) { await Task.Delay(250, ct); return $"[Traduzido] {payload}"; } private async Task<string> GenerateAsync(string payload, CancellationToken ct) { await Task.Delay(500, ct); return $"Conteúdo gerado baseado em: {payload}"; } private async Task<string> GetStatusAsync(CancellationToken ct) { await Task.Delay(50, ct); return $"Sistema operacional | Timestamp: {DateTime.UtcNow:O}"; } private async Task<List<string>> GenerateStreamChunksAsync(string prompt, CancellationToken ct) { await Task.Delay(100, ct); return new List<string> { "Iniciando processamento...", "Analisando contexto...", "Gerando resposta...", $"Resultado: {prompt}", "Processamento concluído." }; } } 
Enter fullscreen mode Exit fullscreen mode

4️⃣ Event Bus com Redis

// MCPPipeline.Core/Events/IEventBus.cs namespace MCPPipeline.Core.Events; public interface IEventBus { Task PublishAsync<T>(T @event) where T : class; Task SubscribeAsync<T>(Func<T, Task> handler) where T : class; } // MCPPipeline.Core/Events/RedisEventBus.cs using StackExchange.Redis; using System.Text.Json; public class RedisEventBus : IEventBus { private readonly IConnectionMultiplexer _redis; private readonly ILogger<RedisEventBus> _logger; private readonly ISubscriber _subscriber; public RedisEventBus( IConnectionMultiplexer redis, ILogger<RedisEventBus> logger) { _redis = redis; _logger = logger; _subscriber = redis.GetSubscriber(); } public async Task PublishAsync<T>(T @event) where T : class { var channel = typeof(T).Name; var message = JsonSerializer.Serialize(@event); await _subscriber.PublishAsync(channel, message); _logger.LogDebug("Evento publicado: {EventType}", channel); } public async Task SubscribeAsync<T>(Func<T, Task> handler) where T : class { var channel = typeof(T).Name; await _subscriber.SubscribeAsync(channel, async (ch, message) => { try { var @event = JsonSerializer.Deserialize<T>(message!); if (@event != null) { await handler(@event); } } catch (Exception ex) { _logger.LogError(ex, "Erro ao processar evento {EventType}", channel); } }); _logger.LogInformation("Inscrito no canal: {Channel}", channel); } } 
Enter fullscreen mode Exit fullscreen mode

5️⃣ Protocol Adapter

// MCPPipeline.Adapter/ProtocolAdapter.cs using MCPPipeline.Contracts.Messages; using MCPPipeline.Grpc; namespace MCPPipeline.Adapter; public interface IProtocolAdapter { MCPCommand FromGrpcRequest(MCPRequest grpcRequest); MCPRequest ToGrpcRequest(MCPCommand command); MCPResponse FromGrpcResponse(MCPGrpcResponse grpcResponse); MCPGrpcResponse ToGrpcResponse(MCPResponse response); } public class ProtocolAdapter : IProtocolAdapter { public MCPCommand FromGrpcRequest(MCPRequest grpcRequest) { return new MCPCommand { Command = grpcRequest.Command, Payload = grpcRequest.Payload, Metadata = grpcRequest.Metadata.ToDictionary(k => k.Key, v => v.Value), Timestamp = DateTimeOffset.FromUnixTimeMilliseconds(grpcRequest.Timestamp).DateTime, Protocol = ProtocolType.Grpc }; } public MCPRequest ToGrpcRequest(MCPCommand command) { return new MCPRequest { Command = command.Command, Payload = command.Payload, Metadata = { command.Metadata }, Timestamp = new DateTimeOffset(command.Timestamp).ToUnixTimeMilliseconds() }; } public MCPResponse FromGrpcResponse(MCPGrpcResponse grpcResponse) { return new MCPResponse { Result = grpcResponse.Result, Status = grpcResponse.Status switch { "OK" => ResponseStatus.Success, "ERROR" => ResponseStatus.Error, "PROCESSING" => ResponseStatus.Processing, _ => ResponseStatus.Error }, ErrorMessage = grpcResponse.ErrorMessage, ProcessingTimeMs = grpcResponse.ProcessingTimeMs }; } public MCPGrpcResponse ToGrpcResponse(MCPResponse response) { return new MCPGrpcResponse { Result = response.Result, Status = response.Status.ToString().ToUpperInvariant(), ErrorMessage = response.ErrorMessage ?? string.Empty, ProcessingTimeMs = response.ProcessingTimeMs }; } } 
Enter fullscreen mode Exit fullscreen mode

6️⃣ gRPC Service Endpoint

// MCPPipeline.GrpcService/Services/MCPGrpcService.cs using Grpc.Core; using MCPPipeline.Grpc; using MCPPipeline.Core.Services; using MCPPipeline.Adapter; namespace MCPPipeline.GrpcService.Services; public class MCPGrpcServiceImpl : MCPService.MCPServiceBase { private readonly IMCPKernelService _kernelService; private readonly IProtocolAdapter _adapter; private readonly ILogger<MCPGrpcServiceImpl> _logger; public MCPGrpcServiceImpl( IMCPKernelService kernelService, IProtocolAdapter adapter, ILogger<MCPGrpcServiceImpl> logger) { _kernelService = kernelService; _adapter = adapter; _logger = logger; } public override async Task<MCPGrpcResponse> SendCommand( MCPRequest request, ServerCallContext context) { _logger.LogInformation( "Recebido comando gRPC: {Command} de {Peer}", request.Command, context.Peer); // Converter para modelo unificado var command = _adapter.FromGrpcRequest(request); command = command with { SessionId = context.Peer }; // Processar via kernel unificado var response = await _kernelService.ExecuteCommandAsync( command, context.CancellationToken); // Converter resposta return _adapter.ToGrpcResponse(response); } public override async Task StreamCommand( MCPRequest request, IServerStreamWriter<MCPStreamResponse> responseStream, ServerCallContext context) { _logger.LogInformation( "Iniciando streaming gRPC: {Command}", request.Command); var command = _adapter.FromGrpcRequest(request); int chunkIndex = 0; await foreach (var chunk in _kernelService.StreamCommandAsync( command, context.CancellationToken)) { await responseStream.WriteAsync(new MCPStreamResponse { Content = chunk, ChunkIndex = chunkIndex++, IsComplete = false }); } // Enviar chunk final await responseStream.WriteAsync(new MCPStreamResponse { Content = "Stream concluído", ChunkIndex = chunkIndex, IsComplete = true }); } public override Task<HealthResponse> HealthCheck( HealthRequest request, ServerCallContext context) { return Task.FromResult(new HealthResponse { Status = "Healthy", Version = "1.0.0", Protocol = "gRPC" }); } } // Program.cs var builder = WebApplication.CreateBuilder(args); builder.Services.AddGrpc(); builder.Services.AddSingleton<IMCPKernelService, MCPKernelService>(); builder.Services.AddSingleton<IProtocolAdapter, ProtocolAdapter>(); // Redis para event bus builder.Services.AddSingleton<IConnectionMultiplexer>( ConnectionMultiplexer.Connect("localhost:6379")); builder.Services.AddSingleton<IEventBus, RedisEventBus>(); var app = builder.Build(); app.MapGrpcService<MCPGrpcServiceImpl>(); app.MapGet("/", () => "MCP gRPC Service"); app.Run(); 
Enter fullscreen mode Exit fullscreen mode

7️⃣ WebSocket Service Endpoint

// MCPPipeline.WebSocketService/Hubs/MCPHub.cs using Microsoft.AspNetCore.SignalR; using MCPPipeline.Contracts.Messages; using MCPPipeline.Core.Services; namespace MCPPipeline.WebSocketService.Hubs; public class MCPHub : Hub { private readonly IMCPKernelService _kernelService; private readonly ILogger<MCPHub> _logger; public MCPHub( IMCPKernelService kernelService, ILogger<MCPHub> logger) { _kernelService = kernelService; _logger = logger; } public override async Task OnConnectedAsync() { _logger.LogInformation("Cliente WebSocket conectado: {ConnectionId}", Context.ConnectionId); await Clients.Caller.SendAsync("Connected", new { ConnectionId = Context.ConnectionId, Protocol = "WebSocket", Message = "Conectado ao MCP Hub" }); await base.OnConnectedAsync(); } public async Task SendCommand(MCPCommand command) { _logger.LogInformation( "Recebido comando WebSocket: {Command} de {ConnectionId}", command.Command, Context.ConnectionId); // Enriquecer comando com dados do WebSocket var enrichedCommand = command with { Protocol = ProtocolType.WebSocket, SessionId = Context.ConnectionId }; // Processar via kernel unificado var response = await _kernelService.ExecuteCommandAsync( enrichedCommand, Context.ConnectionAborted); // Enviar resposta await Clients.Caller.SendAsync("CommandResponse", response); } public async Task StreamCommand(MCPCommand command) { _logger.LogInformation( "Iniciando streaming WebSocket: {Command}", command.Command); var enrichedCommand = command with { Protocol = ProtocolType.WebSocket, SessionId = Context.ConnectionId }; await foreach (var chunk in _kernelService.StreamCommandAsync( enrichedCommand, Context.ConnectionAborted)) { await Clients.Caller.SendAsync("StreamChunk", new { Content = chunk, Timestamp = DateTime.UtcNow }); } await Clients.Caller.SendAsync("StreamComplete", new { Message = "Streaming concluído" }); } } // Program.cs var builder = WebApplication.CreateBuilder(args); builder.Services.AddSignalR(); builder.Services.AddSingleton<IMCPKernelService, MCPKernelService>(); // Redis para event bus builder.Services.AddSingleton<IConnectionMultiplexer>( ConnectionMultiplexer.Connect("localhost:6379")); builder.Services.AddSingleton<IEventBus, RedisEventBus>(); builder.Services.AddCors(options => { options.AddPolicy("AllowAll", policy => policy.AllowAnyOrigin().AllowAnyMethod().AllowAnyHeader()); }); var app = builder.Build(); app.UseCors("AllowAll"); app.MapHub<MCPHub>("/mcphub"); app.MapGet("/", () => "MCP WebSocket Service"); app.Run(); 
Enter fullscreen mode Exit fullscreen mode

8️⃣ API Gateway com YARP

// MCPPipeline.Gateway/Program.cs var builder = WebApplication.CreateBuilder(args); builder.Services.AddReverseProxy() .LoadFromConfig(builder.Configuration.GetSection("ReverseProxy")); var app = builder.Build(); app.MapReverseProxy(); app.MapGet("/", () => Results.Ok(new { Service = "MCP Gateway", Endpoints = new { gRPC = "https://localhost:5001", WebSocket = "https://localhost:5002/mcphub" } })); app.Run(); // appsettings.json { "ReverseProxy": { "Routes": { "grpc-route": { "ClusterId": "grpc-cluster", "Match": { "Path": "/grpc/{**catch-all}" } }, "websocket-route": { "ClusterId": "websocket-cluster", "Match": { "Path": "/ws/{**catch-all}" } } }, "Clusters": { "grpc-cluster": { "Destinations": { "destination1": { "Address": "https://localhost:5001" } } }, "websocket-cluster": { "Destinations": { "destination1": { "Address": "https://localhost:5002" } }, "HttpRequest": { "Version": "1.1", "VersionPolicy": "RequestVersionOrLower" } } } } } 
Enter fullscreen mode Exit fullscreen mode

9️⃣ Cliente Unificado

// MCPPipeline.Client/UnifiedMCPClient.cs public class UnifiedMCPClient : IAsyncDisposable { private readonly MCPService.MCPServiceClient? _grpcClient; private readonly HubConnection? _hubConnection; private readonly ILogger<UnifiedMCPClient> _logger; private readonly bool _useGrpc; public UnifiedMCPClient( string endpoint, bool useGrpc, ILogger<UnifiedMCPClient> logger) { _useGrpc = useGrpc; _logger = logger; if (useGrpc) { var channel = GrpcChannel.ForAddress(endpoint); _grpcClient = new MCPService.MCPServiceClient(channel); _logger.LogInformation("Cliente configurado para gRPC: {Endpoint}", endpoint); } else { _hubConnection = new HubConnectionBuilder() .WithUrl(endpoint) .WithAutomaticReconnect() .Build(); SetupWebSocketHandlers(); _logger.LogInformation("Cliente configurado para WebSocket: {Endpoint}", endpoint); } } public async Task ConnectAsync(CancellationToken ct = default) { if (!_useGrpc && _hubConnection != null) { await _hubConnection.StartAsync(ct); _logger.LogInformation("Conectado via WebSocket"); } } public async Task<MCPResponse> SendCommandAsync( string command, string payload, CancellationToken ct = default) { if (_useGrpc && _grpcClient != null) { return await SendViaGrpcAsync(command, payload, ct); } else if (_hubConnection != null) { return await SendViaWebSocketAsync(command, payload, ct); } throw new InvalidOperationException("Cliente não inicializado"); } private async Task<MCPResponse> SendViaGrpcAsync( string command, string payload, CancellationToken ct) { _logger.LogInformation("Enviando comando via gRPC: {Command}", command); var request = new MCPRequest { Command = command, Payload = payload, Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() }; var response = await _grpcClient!.SendCommandAsync(request, cancellationToken: ct); return new MCPResponse { Result = response.Result, Status = response.Status == "OK" ? ResponseStatus.Success : ResponseStatus.Error, ErrorMessage = response.ErrorMessage, ProcessingTimeMs = response.ProcessingTimeMs }; } private async Task<MCPResponse> SendViaWebSocketAsync( string command, string payload, CancellationToken ct) { _logger.LogInformation("Enviando comando via WebSocket: {Command}", command); var tcs = new TaskCompletionSource<MCPResponse>(); void handler(MCPResponse response) { tcs.TrySetResult(response); } _hubConnection!.On<MCPResponse>("CommandResponse", handler); try { await _hubConnection.InvokeAsync("SendCommand", new MCPCommand { Command = command, Payload = payload, Timestamp = DateTime.UtcNow }, ct); return await tcs.Task.WaitAsync(TimeSpan.FromSeconds(30), ct); } finally { _hubConnection.Remove("CommandResponse"); } } public async IAsyncEnumerable<string> StreamCommandAsync( string command, string payload, [EnumeratorCancellation] CancellationToken ct = default) { if (_useGrpc && _grpcClient != null) { await foreach (var chunk in StreamViaGrpcAsync(command, payload, ct)) { yield return chunk; } } else if (_hubConnection != null) { await foreach (var chunk in StreamViaWebSocketAsync(command, payload, ct)) { yield return chunk; } } } private async IAsyncEnumerable<string> StreamViaGrpcAsync( string command, string payload, [EnumeratorCancellation] CancellationToken ct) { var request = new MCPRequest { Command = command, Payload = payload, Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() }; using var call = _grpcClient!.StreamCommand(request, cancellationToken: ct); await foreach (var response in call.ResponseStream.ReadAllAsync(ct)) { yield return response.Content; } } private async IAsyncEnumerable<string> StreamViaWebSocketAsync( string command, string payload, [EnumeratorCancellation] CancellationToken ct) { var channel = Channel.CreateUnbounded<string>(); _hubConnection!.On<dynamic>("StreamChunk", (chunk) => { channel.Writer.TryWrite(chunk.Content.ToString()); }); _hubConnection.On<dynamic>("StreamComplete", (_) => { channel.Writer.Complete(); }); await _hubConnection.InvokeAsync("StreamCommand", new MCPCommand { Command = command, Payload = payload, Timestamp = DateTime.UtcNow }, ct); await foreach (var chunk in channel.Reader.ReadAllAsync(ct)) { yield return chunk; } } private void SetupWebSocketHandlers() { _hubConnection!.On<object>("Connected", data => { _logger.LogInformation("✅ WebSocket conectado: {Data}", data); }); _hubConnection.Reconnecting += error => { _logger.LogWarning("⚠️ WebSocket reconectando: {Error}", error?.Message); return Task.CompletedTask; }; _hubConnection.Reconnected += connectionId => { _logger.LogInformation("✅ WebSocket reconectado: {ConnectionId}", connectionId); return Task.CompletedTask; }; _hubConnection.Closed += error => { _logger.LogError("❌ WebSocket fechado: {Error}", error?.Message); return Task.CompletedTask; }; } public async ValueTask DisposeAsync() { if (_hubConnection != null) { await _hubConnection.StopAsync(); await _hubConnection.DisposeAsync(); } } } // Exemplo de uso public class Program { public static async Task Main(string[] args) { var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole()); var logger = loggerFactory.CreateLogger<UnifiedMCPClient>(); // Cliente gRPC await using var grpcClient = new UnifiedMCPClient( "https://localhost:5001", useGrpc: true, logger); var response = await grpcClient.SendCommandAsync("analyze", "Teste via gRPC"); Console.WriteLine($"gRPC: {response.Result}"); // Cliente WebSocket await using var wsClient = new UnifiedMCPClient( "https://localhost:5002/mcphub", useGrpc: false, logger); await wsClient.ConnectAsync(); response = await wsClient.SendCommandAsync("analyze", "Teste via WebSocket"); Console.WriteLine($"WebSocket: {response.Result}"); // Streaming await foreach (var chunk in wsClient.StreamCommandAsync("generate", "Prompt de teste")) { Console.WriteLine($"Chunk: {chunk}"); } } } 
Enter fullscreen mode Exit fullscreen mode

🔍 Observabilidade Unificada

OpenTelemetry Configuration

// MCPPipeline.Observability/OpenTelemetryExtensions.cs using OpenTelemetry.Resources; using OpenTelemetry.Trace; using OpenTelemetry.Metrics; public static class OpenTelemetryExtensions { public static IServiceCollection AddMCPObservability( this IServiceCollection services, string serviceName) { services.AddOpenTelemetry() .ConfigureResource(resource => { resource.AddService( serviceName: serviceName, serviceVersion: "1.0.0"); }) .WithTracing(tracing => { tracing .AddAspNetCoreInstrumentation(options => { options.RecordException = true; options.EnrichWithHttpRequest = (activity, request) => { activity.SetTag("http.scheme", request.Scheme); activity.SetTag("http.client_ip", request.HttpContext.Connection.RemoteIpAddress); }; }) .AddGrpcClientInstrumentation() .AddSource("MCPPipeline.Core") .AddSource("MCPPipeline.GrpcService") .AddSource("MCPPipeline.WebSocketService") .AddOtlpExporter(options => { options.Endpoint = new Uri("http://localhost:4317"); }); }) .WithMetrics(metrics => { metrics .AddAspNetCoreInstrumentation() .AddRuntimeInstrumentation() .AddMeter("MCPPipeline.*") .AddPrometheusExporter() .AddOtlpExporter(); }); return services; } } // Uso nos serviços builder.Services.AddMCPObservability("MCPPipeline.GrpcService"); 
Enter fullscreen mode Exit fullscreen mode

Métricas Customizadas

// MCPPipeline.Observability/MCPMetrics.cs using System.Diagnostics.Metrics; public class MCPMetrics { private readonly Meter _meter; private readonly Counter<long> _commandsProcessed; private readonly Histogram<double> _commandDuration; private readonly Counter<long> _commandErrors; private readonly UpDownCounter<int> _activeConnections; public MCPMetrics() { _meter = new Meter("MCPPipeline.Metrics", "1.0.0"); _commandsProcessed = _meter.CreateCounter<long>( "mcp.commands.processed", description: "Total de comandos processados"); _commandDuration = _meter.CreateHistogram<double>( "mcp.command.duration", unit: "ms", description: "Duração do processamento de comandos"); _commandErrors = _meter.CreateCounter<long>( "mcp.commands.errors", description: "Total de erros no processamento"); _activeConnections = _meter.CreateUpDownCounter<int>( "mcp.connections.active", description: "Conexões ativas"); } public void RecordCommandProcessed(string command, ProtocolType protocol) { _commandsProcessed.Add(1, new KeyValuePair<string, object?>("command", command), new KeyValuePair<string, object?>("protocol", protocol.ToString())); } public void RecordCommandDuration(string command, ProtocolType protocol, double durationMs) { _commandDuration.Record(durationMs, new KeyValuePair<string, object?>("command", command), new KeyValuePair<string, object?>("protocol", protocol.ToString())); } public void RecordCommandError(string command, ProtocolType protocol, string errorType) { _commandErrors.Add(1, new KeyValuePair<string, object?>("command", command), new KeyValuePair<string, object?>("protocol", protocol.ToString()), new KeyValuePair<string, object?>("error_type", errorType)); } public void IncrementActiveConnections(ProtocolType protocol) { _activeConnections.Add(1, new KeyValuePair<string, object?>("protocol", protocol.ToString())); } public void DecrementActiveConnections(ProtocolType protocol) { _activeConnections.Add(-1, new KeyValuePair<string, object?>("protocol", protocol.ToString())); } } 
Enter fullscreen mode Exit fullscreen mode

📊 Dashboard com Grafana

Prometheus Configuration

# prometheus.yml global: scrape_interval: 15s evaluation_interval: 15s scrape_configs: - job_name: 'mcp-grpc-service' static_configs: - targets: ['localhost:5001'] - job_name: 'mcp-websocket-service' static_configs: - targets: ['localhost:5002'] - job_name: 'mcp-gateway' static_configs: - targets: ['localhost:5000'] 
Enter fullscreen mode Exit fullscreen mode

Grafana Dashboard JSON

{ "dashboard": { "title": "MCP Hybrid Protocol Dashboard", "panels": [ { "title": "Comandos Processados por Protocolo", "targets": [ { "expr": "rate(mcp_commands_processed_total[5m])", "legendFormat": "{{protocol}}" } ] }, { "title": "Latência Média por Comando", "targets": [ { "expr": "histogram_quantile(0.95, rate(mcp_command_duration_bucket[5m]))", "legendFormat": "p95 - {{command}}" } ] }, { "title": "Conexões Ativas", "targets": [ { "expr": "mcp_connections_active", "legendFormat": "{{protocol}}" } ] }, { "title": "Taxa de Erro", "targets": [ { "expr": "rate(mcp_commands_errors_total[5m])", "legendFormat": "{{protocol}} - {{error_type}}" } ] } ] } } 
Enter fullscreen mode Exit fullscreen mode

🧪 Testes de Integração

Teste Híbrido

// MCPPipeline.IntegrationTests/HybridProtocolTests.cs public class HybridProtocolTests : IClassFixture<MCPTestFixture> { private readonly MCPTestFixture _fixture; public HybridProtocolTests(MCPTestFixture fixture) { _fixture = fixture; } [Fact] public async Task Should_Process_Same_Command_Via_Both_Protocols() { // Arrange var grpcClient = _fixture.CreateGrpcClient(); var wsClient = _fixture.CreateWebSocketClient(); await wsClient.ConnectAsync(); var testPayload = "Teste de integração híbrida"; // Act var grpcResponse = await grpcClient.SendCommandAsync("analyze", testPayload); var wsResponse = await wsClient.SendCommandAsync("analyze", testPayload); // Assert Assert.Equal(ResponseStatus.Success, grpcResponse.Status); Assert.Equal(ResponseStatus.Success, wsResponse.Status); // Ambos devem retornar resultados equivalentes Assert.Contains("palavras", grpcResponse.Result); Assert.Contains("palavras", wsResponse.Result); } [Fact] public async Task Should_Handle_Concurrent_Requests_From_Both_Protocols() { // Arrange var grpcClient = _fixture.CreateGrpcClient(); var wsClient = _fixture.CreateWebSocketClient(); await wsClient.ConnectAsync(); var tasks = new List<Task<MCPResponse>>(); // Act - Enviar 50 requisições concorrentes de cada protocolo for (int i = 0; i < 50; i++) { tasks.Add(grpcClient.SendCommandAsync("status", $"gRPC-{i}")); tasks.Add(wsClient.SendCommandAsync("status", $"WS-{i}")); } var responses = await Task.WhenAll(tasks); // Assert Assert.All(responses, r => Assert.Equal(ResponseStatus.Success, r.Status)); Assert.Equal(100, responses.Length); } [Fact] public async Task Should_Broadcast_Events_Between_Protocols() { // Arrange var grpcClient = _fixture.CreateGrpcClient(); var wsClient = _fixture.CreateWebSocketClient(); await wsClient.ConnectAsync(); var eventReceived = new TaskCompletionSource<bool>(); // Configurar listener no WebSocket // (implementação depende do event bus) // Act - Enviar comando via gRPC await grpcClient.SendCommandAsync("generate", "Teste de evento"); // Assert - WebSocket deve receber notificação var received = await eventReceived.Task.WaitAsync(TimeSpan.FromSeconds(5)); Assert.True(received); } } public class MCPTestFixture : IAsyncLifetime { private WebApplication? _grpcApp; private WebApplication? _wsApp; public async Task InitializeAsync() { // Inicializar serviços de teste _grpcApp = CreateGrpcService(); _wsApp = CreateWebSocketService(); await _grpcApp.StartAsync(); await _wsApp.StartAsync(); } public async Task DisposeAsync() { if (_grpcApp != null) await _grpcApp.StopAsync(); if (_wsApp != null) await _wsApp.StopAsync(); } public UnifiedMCPClient CreateGrpcClient() { return new UnifiedMCPClient( "https://localhost:5001", useGrpc: true, Mock.Of<ILogger<UnifiedMCPClient>>()); } public UnifiedMCPClient CreateWebSocketClient() { return new UnifiedMCPClient( "https://localhost:5002/mcphub", useGrpc: false, Mock.Of<ILogger<UnifiedMCPClient>>()); } private WebApplication CreateGrpcService() { var builder = WebApplication.CreateBuilder(); builder.Services.AddGrpc(); builder.Services.AddSingleton<IMCPKernelService, MCPKernelService>(); builder.Services.AddSingleton<IProtocolAdapter, ProtocolAdapter>(); var app = builder.Build(); app.MapGrpcService<MCPGrpcServiceImpl>(); return app; } private WebApplication CreateWebSocketService() { var builder = WebApplication.CreateBuilder(); builder.Services.AddSignalR(); builder.Services.AddSingleton<IMCPKernelService, MCPKernelService>(); var app = builder.Build(); app.MapHub<MCPHub>("/mcphub"); return app; } } 
Enter fullscreen mode Exit fullscreen mode

🚀 Teste de Performance

Benchmark com BenchmarkDotNet

// MCPPipeline.Benchmarks/ProtocolBenchmarks.cs using BenchmarkDotNet.Attributes; using BenchmarkDotNet.Running; [MemoryDiagnoser] [SimpleJob(warmupCount: 3, iterationCount: 10)] public class ProtocolBenchmarks { private UnifiedMCPClient _grpcClient = null!; private UnifiedMCPClient _wsClient = null!; [GlobalSetup] public async Task Setup() { _grpcClient = new UnifiedMCPClient( "https://localhost:5001", useGrpc: true, Mock.Of<ILogger<UnifiedMCPClient>>()); _wsClient = new UnifiedMCPClient( "https://localhost:5002/mcphub", useGrpc: false, Mock.Of<ILogger<UnifiedMCPClient>>()); await _wsClient.ConnectAsync(); } [Benchmark(Baseline = true)] public async Task<MCPResponse> GrpcCommand() { return await _grpcClient.SendCommandAsync("status", "benchmark"); } [Benchmark] public async Task<MCPResponse> WebSocketCommand() { return await _wsClient.SendCommandAsync("status", "benchmark"); } [Benchmark] public async Task GrpcStreaming() { await foreach (var _ in _grpcClient.StreamCommandAsync("generate", "test")) { // Consumir stream } } [Benchmark] public async Task WebSocketStreaming() { await foreach (var _ in _wsClient.StreamCommandAsync("generate", "test")) { // Consumir stream } } [GlobalCleanup] public async Task Cleanup() { await _grpcClient.DisposeAsync(); await _wsClient.DisposeAsync(); } } // Program.cs public class Program { public static void Main(string[] args) { BenchmarkRunner.Run<ProtocolBenchmarks>(); } } 
Enter fullscreen mode Exit fullscreen mode

Resultados Esperados

| Method | Mean | Error | StdDev | Ratio | Gen0 | Allocated | |------------------ |----------:|----------:|----------:|------:|-----:|----------:| | GrpcCommand | 2.450 ms | 0.0421 ms | 0.0394 ms | 1.00 | 15.6 | 128KB | | WebSocketCommand | 8.732 ms | 0.1523 ms | 0.1425 ms | 3.56 | 31.2 | 256KB | | GrpcStreaming | 12.105 ms | 0.2103 ms | 0.1967 ms | 4.94 | 46.8 | 384KB | | WebSocketStreaming| 18.421 ms | 0.3142 ms | 0.2940 ms | 7.52 | 62.5 | 512KB | 
Enter fullscreen mode Exit fullscreen mode

🎯 Decisão de Protocolo: Quando Usar Cada Um

Matriz de Decisão

public class ProtocolSelector { public ProtocolType SelectOptimalProtocol(ClientContext context) { // 1. Cliente é navegador? → WebSocket if (context.IsWebBrowser) return ProtocolType.WebSocket; // 2. Comunicação interna entre serviços? → gRPC if (context.IsInternalService) return ProtocolType.Grpc; // 3. Requer streaming bidirecional de longa duração? → WebSocket if (context.RequiresLongLivedStream) return ProtocolType.WebSocket; // 4. Prioriza latência mínima? → gRPC if (context.LatencySensitive) return ProtocolType.Grpc; // 5. Múltiplos clientes precisam receber broadcasts? → WebSocket if (context.RequiresBroadcast) return ProtocolType.WebSocket; // 6. Padrão: gRPC para performance return ProtocolType.Grpc; } } public record ClientContext { public bool IsWebBrowser { get; init; } public bool IsInternalService { get; init; } public bool RequiresLongLivedStream { get; init; } public bool LatencySensitive { get; init; } public bool RequiresBroadcast { get; init; } } 
Enter fullscreen mode Exit fullscreen mode

Tabela Comparativa

Cenário gRPC WebSocket Recomendação
API Backend → Backend ✅ Ideal ⚠️ Overhead gRPC
Aplicação Web → Backend ❌ Limitado ✅ Nativo WebSocket
Mobile App → Backend ✅ Excelente ✅ Bom gRPC (melhor performance)
Dashboard Real-time ⚠️ Polling ✅ Push nativo WebSocket
Chat Multi-usuário ❌ Não ideal ✅ Perfeito WebSocket
Análise de Dados ✅ Alta performance ⚠️ Mais lento gRPC
Streaming de IA ✅ Eficiente ✅ Funcional Ambos (contexto dependente)

🔒 Segurança em Ambiente Híbrido

Autenticação Unificada

// MCPPipeline.Security/UnifiedAuthenticationExtensions.cs public static class UnifiedAuthenticationExtensions { public static IServiceCollection AddUnifiedAuthentication( this IServiceCollection services, IConfiguration configuration) { services.AddAuthentication(options => { options.DefaultAuthenticateScheme = JwtBearerDefaults.AuthenticationScheme; options.DefaultChallengeScheme = JwtBearerDefaults.AuthenticationScheme; }) .AddJwtBearer(options => { options.TokenValidationParameters = new TokenValidationParameters { ValidateIssuer = true, ValidateAudience = true, ValidateLifetime = true, ValidateIssuerSigningKey = true, ValidIssuer = configuration["Jwt:Issuer"], ValidAudience = configuration["Jwt:Audience"], IssuerSigningKey = new SymmetricSecurityKey( Encoding.UTF8.GetBytes(configuration["Jwt:Key"]!)) }; // Suporte para WebSocket token via query string options.Events = new JwtBearerEvents { OnMessageReceived = context => { var accessToken = context.Request.Query["access_token"]; var path = context.HttpContext.Request.Path; if (!string.IsNullOrEmpty(accessToken) && path.StartsWithSegments("/mcphub")) { context.Token = accessToken; } return Task.CompletedTask; } }; }); // Adicionar metadata para gRPC services.AddGrpc(options => { options.Interceptors.Add<AuthenticationInterceptor>(); }); return services; } } // Interceptor para gRPC public class AuthenticationInterceptor : Interceptor { private readonly ILogger<AuthenticationInterceptor> _logger; public AuthenticationInterceptor(ILogger<AuthenticationInterceptor> logger) { _logger = logger; } public override async Task<TResponse> UnaryServerHandler<TRequest, TResponse>( TRequest request, ServerCallContext context, UnaryServerMethod<TRequest, TResponse> continuation) { var authHeader = context.RequestHeaders.GetValue("authorization"); if (string.IsNullOrEmpty(authHeader)) { _logger.LogWarning("Requisição gRPC sem autenticação  public override async Task<TResponse> UnaryServerHandler<TRequest, TResponse>( TRequest request, ServerCallContext context, UnaryServerMethod<TRequest, TResponse> continuation) { var authHeader = context.RequestHeaders.GetValue("authorization"); if (string.IsNullOrEmpty(authHeader)) { _logger.LogWarning("Requisição gRPC sem autenticação"); throw new RpcException(new Status(StatusCode.Unauthenticated, "Token não fornecido")); } // Validar token (implementar lógica de validação) _logger.LogInformation("Token gRPC validado para peer: {Peer}", context.Peer); return await continuation(request, context); } } 
Enter fullscreen mode Exit fullscreen mode

Rate Limiting Unificado

// MCPPipeline.Security/RateLimitingExtensions.cs using System.Threading.RateLimiting; public static class RateLimitingExtensions { public static IServiceCollection AddUnifiedRateLimiting( this IServiceCollection services) { services.AddRateLimiter(options => { // Policy para gRPC options.AddFixedWindowLimiter("grpc", limiterOptions => { limiterOptions.PermitLimit = 100; limiterOptions.Window = TimeSpan.FromMinutes(1); limiterOptions.QueueProcessingOrder = QueueProcessingOrder.OldestFirst; limiterOptions.QueueLimit = 10; }); // Policy para WebSocket options.AddSlidingWindowLimiter("websocket", limiterOptions => { limiterOptions.PermitLimit = 60; limiterOptions.Window = TimeSpan.FromMinutes(1); limiterOptions.SegmentsPerWindow = 4; limiterOptions.QueueProcessingOrder = QueueProcessingOrder.OldestFirst; limiterOptions.QueueLimit = 5; }); options.OnRejected = async (context, token) => { context.HttpContext.Response.StatusCode = 429; await context.HttpContext.Response.WriteAsJsonAsync(new { error = "Rate limit exceeded", retryAfter = context.Lease.TryGetMetadata( MetadataName.RetryAfter, out var retryAfter) ? retryAfter.TotalSeconds : null }, token); }; }); return services; } } 
Enter fullscreen mode Exit fullscreen mode

🏗️ Docker Compose para Ambiente Completo

# docker-compose.yml version: '3.8' services: redis: image: redis:7-alpine ports: - "6379:6379" command: redis-server --appendonly yes volumes: - redis-data:/data jaeger: image: jaegertracing/all-in-one:latest ports: - "16686:16686" # UI - "4317:4317" # OTLP gRPC - "4318:4318" # OTLP HTTP environment: - COLLECTOR_OTLP_ENABLED=true prometheus: image: prom/prometheus:latest ports: - "9090:9090" volumes: - ./prometheus.yml:/etc/prometheus/prometheus.yml - prometheus-data:/prometheus command: - '--config.file=/etc/prometheus/prometheus.yml' - '--storage.tsdb.path=/prometheus' grafana: image: grafana/grafana:latest ports: - "3000:3000" volumes: - grafana-data:/var/lib/grafana environment: - GF_SECURITY_ADMIN_PASSWORD=admin - GF_USERS_ALLOW_SIGN_UP=false mcp-grpc-service: build: context: . dockerfile: src/MCPPipeline.GrpcService/Dockerfile ports: - "5001:5001" environment: - ASPNETCORE_ENVIRONMENT=Development - Redis__ConnectionString=redis:6379 - Jaeger__AgentHost=jaeger - Jaeger__AgentPort=4317 depends_on: - redis - jaeger mcp-websocket-service: build: context: . dockerfile: src/MCPPipeline.WebSocketService/Dockerfile ports: - "5002:5002" environment: - ASPNETCORE_ENVIRONMENT=Development - Redis__ConnectionString=redis:6379 - Jaeger__AgentHost=jaeger - Jaeger__AgentPort=4317 depends_on: - redis - jaeger mcp-gateway: build: context: . dockerfile: src/MCPPipeline.Gateway/Dockerfile ports: - "5000:5000" environment: - ASPNETCORE_ENVIRONMENT=Development depends_on: - mcp-grpc-service - mcp-websocket-service volumes: redis-data: prometheus-data: grafana-data: 
Enter fullscreen mode Exit fullscreen mode

📈 Estratégias de Escalabilidade

Kubernetes Deployment

# k8s/mcp-deployment.yaml apiVersion: v1 kind: ConfigMap metadata: name: mcp-config data: redis-connection: "redis-service:6379" jaeger-endpoint: "http://jaeger-collector:4317" apiVersion: apps/v1 kind: Deployment metadata: name: mcp-grpc-service spec: replicas: 3 selector: matchLabels: app: mcp-grpc template: metadata: labels: app: mcp-grpc spec: containers: - name: mcp-grpc image: mcp-grpc-service:latest# 🧩 Minha Primeira Comunicação com MCP e .NET – Parte 4 
Enter fullscreen mode Exit fullscreen mode

Integração Completa com gRPC & WebSocket

Nesta quarta parte da série "Minha Primeira Comunicação com MCP e .NET", exploramos como criar uma arquitetura híbrida que combina o melhor dos dois mundos: a eficiência e tipagem forte do gRPC para comunicação interna entre serviços, e a flexibilidade e suporte nativo do WebSocket para clientes web e mobile, criando um ecossistema MCP robusto e escalável.


🚀 Introdução

Após explorarmos gRPC (Parte 2) e WebSocket (Parte 3) individualmente, surge uma questão arquitetural importante: e se precisarmos dos benefícios de ambos?

Em sistemas corporativos modernos, é comum ter:

  • Microsserviços backend que se comunicam via gRPC (baixa latência, tipagem forte)
  • Clientes web/mobile que precisam de WebSocket (suporte nativo, real-time)
  • Agentes MCP que podem usar ambos os protocolos conforme o contexto

Este artigo demonstra como construir uma arquitetura unificada que oferece ambos os protocolos, com roteamento inteligente, adaptadores de protocolo, e observabilidade centralizada.


🧠 Arquitetura Híbrida: O Melhor dos Dois Mundos

Visão Geral da Arquitetura

┌─────────────────────────────────────────────────────────────────┐ │ API Gateway / BFF │ │ (Protocol Orchestrator) │ └──────────────────┬─────────────────────┬────────────────────────┘ │ │ ┌─────────▼──────────┐ ┌───────▼──────────┐ │ gRPC Endpoint │ │ WebSocket Endpoint│ │ (Port 5001) │ │ (Port 5002) │ └─────────┬──────────┘ └───────┬───────────┘ │ │ │ ┌────────────────┴─────────────────┐ │ │ │ ┌─────────▼────▼──────────────┐ ┌─────────────▼──────┐ │ Protocol Adapter Layer │ │ SignalR Hub Layer │ │ (gRPC ↔ WebSocket) │ │ (Client Manager) │ └─────────┬──────────────────┘ └─────────┬────────────┘ │ │ ┌─────────▼──────────────────────────────────▼────────┐ │ MCP Kernel Orchestrator │ │ (Unified Command Processing) │ └─────────┬──────────────────────────┬─────────────────┘ │ │ ┌─────────▼────────┐ ┌─────────▼──────────┐ │ Domain Services │ │ Event Bus (Redis) │ │ (Business Logic)│ │ (Cross-Protocol) │ └──────────────────┘ └────────────────────┘ 
Enter fullscreen mode Exit fullscreen mode

Componentes Principais

  1. Protocol Orchestrator - Decide qual protocolo usar baseado em contexto
  2. Protocol Adapter Layer - Converte mensagens entre gRPC e WebSocket
  3. Unified MCP Kernel - Processa comandos independente do protocolo
  4. Event Bus - Sincroniza eventos entre instâncias e protocolos
  5. Observability Layer - Tracing unificado para ambos os protocolos

🏗️ Implementação da Arquitetura Híbrida

1️⃣ Estrutura do Projeto

MCPPipeline.Hybrid/ ├── src/ │ ├── MCPPipeline.Contracts/ # Modelos compartilhados │ │ ├── Messages/ │ │ │ ├── MCPCommand.cs │ │ │ ├── MCPResponse.cs │ │ │ └── MCPEvent.cs │ │ └── Protos/ │ │ └── mcp.proto │ │ │ ├── MCPPipeline.Core/ # Lógica de negócio │ │ ├── Services/ │ │ │ ├── IMCPKernelService.cs │ │ │ └── MCPKernelService.cs │ │ └── Events/ │ │ └── IEventBus.cs │ │ │ ├── MCPPipeline.GrpcService/ # Endpoint gRPC │ │ ├── Services/ │ │ │ └── MCPGrpcService.cs │ │ └── Program.cs │ │ │ ├── MCPPipeline.WebSocketService/ # Endpoint WebSocket │ │ ├── Hubs/ │ │ │ └── MCPHub.cs │ │ └── Program.cs │ │ │ ├── MCPPipeline.Gateway/ # API Gateway (YARP) │ │ ├── Configuration/ │ │ └── Program.cs │ │ │ └── MCPPipeline.Adapter/ # Protocol Adapter │ ├── GrpcToWebSocketAdapter.cs │ └── WebSocketToGrpcAdapter.cs │ └── tests/ └── MCPPipeline.IntegrationTests/ 
Enter fullscreen mode Exit fullscreen mode

2️⃣ Contratos Compartilhados

// MCPPipeline.Contracts/Messages/MCPCommand.cs namespace MCPPipeline.Contracts.Messages; public record MCPCommand { public string CommandId { get; init; } = Guid.NewGuid().ToString(); public string Command { get; init; } = string.Empty; public string Payload { get; init; } = string.Empty; public Dictionary<string, string> Metadata { get; init; } = new(); public DateTime Timestamp { get; init; } = DateTime.UtcNow; public string SessionId { get; init; } = string.Empty; public ProtocolType Protocol { get; init; } public int Priority { get; init; } = 0; } public record MCPResponse { public string CommandId { get; init; } = string.Empty; public string Result { get; init; } = string.Empty; public ResponseStatus Status { get; init; } public string? ErrorMessage { get; init; } public long ProcessingTimeMs { get; init; } public DateTime Timestamp { get; init; } = DateTime.UtcNow; public Dictionary<string, object> Metrics { get; init; } = new(); } public record MCPEvent { public string EventId { get; init; } = Guid.NewGuid().ToString(); public string EventType { get; init; } = string.Empty; public string Source { get; init; } = string.Empty; public object Data { get; init; } = new(); public DateTime Timestamp { get; init; } = DateTime.UtcNow; } public enum ProtocolType { Unknown = 0, Grpc = 1, WebSocket = 2, Http = 3 } public enum ResponseStatus { Success, Error, Processing, Timeout } 
Enter fullscreen mode Exit fullscreen mode

3️⃣ MCP Kernel Unificado

// MCPPipeline.Core/Services/MCPKernelService.cs using MCPPipeline.Contracts.Messages; using System.Diagnostics; namespace MCPPipeline.Core.Services; public interface IMCPKernelService { Task<MCPResponse> ExecuteCommandAsync(MCPCommand command, CancellationToken ct); IAsyncEnumerable<string> StreamCommandAsync(MCPCommand command, CancellationToken ct); } public class MCPKernelService : IMCPKernelService { private readonly ILogger<MCPKernelService> _logger; private readonly IEventBus _eventBus; private static readonly ActivitySource ActivitySource = new("MCPPipeline.Core"); public MCPKernelService( ILogger<MCPKernelService> logger, IEventBus eventBus) { _logger = logger; _eventBus = eventBus; } public async Task<MCPResponse> ExecuteCommandAsync( MCPCommand command, CancellationToken ct) { using var activity = ActivitySource.StartActivity("ExecuteCommand"); activity?.SetTag("command.id", command.CommandId); activity?.SetTag("command.type", command.Command); activity?.SetTag("protocol", command.Protocol.ToString()); var sw = Stopwatch.StartNew(); try { _logger.LogInformation( "Executando comando {Command} via {Protocol} | Session: {SessionId}", command.Command, command.Protocol, command.SessionId); // Publicar evento de início await _eventBus.PublishAsync(new MCPEvent { EventType = "CommandStarted", Source = command.Protocol.ToString(), Data = new { command.CommandId, command.Command } }); // Processar comando var result = await ProcessCommandAsync(command, ct); sw.Stop(); var response = new MCPResponse { CommandId = command.CommandId, Result = result, Status = ResponseStatus.Success, ProcessingTimeMs = sw.ElapsedMilliseconds, Metrics = new Dictionary<string, object> { ["protocol"] = command.Protocol.ToString(), ["priority"] = command.Priority } }; // Publicar evento de conclusão await _eventBus.PublishAsync(new MCPEvent { EventType = "CommandCompleted", Source = command.Protocol.ToString(), Data = new { command.CommandId, response.ProcessingTimeMs } }); activity?.SetTag("response.status", "success"); return response; } catch (Exception ex) { sw.Stop(); _logger.LogError(ex, "Erro ao executar comando {Command} via {Protocol}", command.Command, command.Protocol); activity?.SetTag("response.status", "error"); activity?.SetTag("error.message", ex.Message); await _eventBus.PublishAsync(new MCPEvent { EventType = "CommandFailed", Source = command.Protocol.ToString(), Data = new { command.CommandId, Error = ex.Message } }); return new MCPResponse { CommandId = command.CommandId, Status = ResponseStatus.Error, ErrorMessage = ex.Message, ProcessingTimeMs = sw.ElapsedMilliseconds }; } } public async IAsyncEnumerable<string> StreamCommandAsync( MCPCommand command, [EnumeratorCancellation] CancellationToken ct) { using var activity = ActivitySource.StartActivity("StreamCommand"); activity?.SetTag("command.id", command.CommandId); _logger.LogInformation( "Iniciando streaming para comando {Command} via {Protocol}", command.Command, command.Protocol); var chunks = await GenerateStreamChunksAsync(command.Payload, ct); for (int i = 0; i < chunks.Count; i++) { if (ct.IsCancellationRequested) yield break; yield return chunks[i]; await Task.Delay(50, ct); // Simula processamento } } private async Task<string> ProcessCommandAsync(MCPCommand command, CancellationToken ct) { return command.Command.ToLowerInvariant() switch { "analyze" => await AnalyzeAsync(command.Payload, ct), "summarize" => await SummarizeAsync(command.Payload, ct), "translate" => await TranslateAsync(command.Payload, ct), "generate" => await GenerateAsync(command.Payload, ct), "status" => await GetStatusAsync(ct), _ => throw new InvalidOperationException($"Comando desconhecido: {command.Command}") }; } private async Task<string> AnalyzeAsync(string payload, CancellationToken ct) { await Task.Delay(200, ct); var words = payload.Split(' ', StringSplitOptions.RemoveEmptyEntries).Length; return $"Análise: {words} palavras, {payload.Length} caracteres"; } private async Task<string> SummarizeAsync(string payload, CancellationToken ct) { await Task.Delay(300, ct); return payload.Length > 100 ? $"Resumo: {payload[..100]}..." : $"Resumo: {payload}"; } private async Task<string> TranslateAsync(string payload, CancellationToken ct) { await Task.Delay(250, ct); return $"[Traduzido] {payload}"; } private async Task<string> GenerateAsync(string payload, CancellationToken ct) { await Task.Delay(500, ct); return $"Conteúdo gerado baseado em: {payload}"; } private async Task<string> GetStatusAsync(CancellationToken ct) { await Task.Delay(50, ct); return $"Sistema operacional | Timestamp: {DateTime.UtcNow:O}"; } private async Task<List<string>> GenerateStreamChunksAsync(string prompt, CancellationToken ct) { await Task.Delay(100, ct); return new List<string> { "Iniciando processamento...", "Analisando contexto...", "Gerando resposta...", $"Resultado: {prompt}", "Processamento concluído." }; } } 
Enter fullscreen mode Exit fullscreen mode

4️⃣ Event Bus com Redis

// MCPPipeline.Core/Events/IEventBus.cs namespace MCPPipeline.Core.Events; public interface IEventBus { Task PublishAsync<T>(T @event) where T : class; Task SubscribeAsync<T>(Func<T, Task> handler) where T : class; } // MCPPipeline.Core/Events/RedisEventBus.cs using StackExchange.Redis; using System.Text.Json; public class RedisEventBus : IEventBus { private readonly IConnectionMultiplexer _redis; private readonly ILogger<RedisEventBus> _logger; private readonly ISubscriber _subscriber; public RedisEventBus( IConnectionMultiplexer redis, ILogger<RedisEventBus> logger) { _redis = redis; _logger = logger; _subscriber = redis.GetSubscriber(); } public async Task PublishAsync<T>(T @event) where T : class { var channel = typeof(T).Name; var message = JsonSerializer.Serialize(@event); await _subscriber.PublishAsync(channel, message); _logger.LogDebug("Evento publicado: {EventType}", channel); } public async Task SubscribeAsync<T>(Func<T, Task> handler) where T : class { var channel = typeof(T).Name; await _subscriber.SubscribeAsync(channel, async (ch, message) => { try { var @event = JsonSerializer.Deserialize<T>(message!); if (@event != null) { await handler(@event); } } catch (Exception ex) { _logger.LogError(ex, "Erro ao processar evento {EventType}", channel); } }); _logger.LogInformation("Inscrito no canal: {Channel}", channel); } } 
Enter fullscreen mode Exit fullscreen mode

5️⃣ Protocol Adapter

// MCPPipeline.Adapter/ProtocolAdapter.cs using MCPPipeline.Contracts.Messages; using MCPPipeline.Grpc; namespace MCPPipeline.Adapter; public interface IProtocolAdapter { MCPCommand FromGrpcRequest(MCPRequest grpcRequest); MCPRequest ToGrpcRequest(MCPCommand command); MCPResponse FromGrpcResponse(MCPGrpcResponse grpcResponse); MCPGrpcResponse ToGrpcResponse(MCPResponse response); } public class ProtocolAdapter : IProtocolAdapter { public MCPCommand FromGrpcRequest(MCPRequest grpcRequest) { return new MCPCommand { Command = grpcRequest.Command, Payload = grpcRequest.Payload, Metadata = grpcRequest.Metadata.ToDictionary(k => k.Key, v => v.Value), Timestamp = DateTimeOffset.FromUnixTimeMilliseconds(grpcRequest.Timestamp).DateTime, Protocol = ProtocolType.Grpc }; } public MCPRequest ToGrpcRequest(MCPCommand command) { return new MCPRequest { Command = command.Command, Payload = command.Payload, Metadata = { command.Metadata }, Timestamp = new DateTimeOffset(command.Timestamp).ToUnixTimeMilliseconds() }; } public MCPResponse FromGrpcResponse(MCPGrpcResponse grpcResponse) { return new MCPResponse { Result = grpcResponse.Result, Status = grpcResponse.Status switch { "OK" => ResponseStatus.Success, "ERROR" => ResponseStatus.Error, "PROCESSING" => ResponseStatus.Processing, _ => ResponseStatus.Error }, ErrorMessage = grpcResponse.ErrorMessage, ProcessingTimeMs = grpcResponse.ProcessingTimeMs }; } public MCPGrpcResponse ToGrpcResponse(MCPResponse response) { return new MCPGrpcResponse { Result = response.Result, Status = response.Status.ToString().ToUpperInvariant(), ErrorMessage = response.ErrorMessage ?? string.Empty, ProcessingTimeMs = response.ProcessingTimeMs }; } } 
Enter fullscreen mode Exit fullscreen mode

6️⃣ gRPC Service Endpoint

// MCPPipeline.GrpcService/Services/MCPGrpcService.cs using Grpc.Core; using MCPPipeline.Grpc; using MCPPipeline.Core.Services; using MCPPipeline.Adapter; namespace MCPPipeline.GrpcService.Services; public class MCPGrpcServiceImpl : MCPService.MCPServiceBase { private readonly IMCPKernelService _kernelService; private readonly IProtocolAdapter _adapter; private readonly ILogger<MCPGrpcServiceImpl> _logger; public MCPGrpcServiceImpl( IMCPKernelService kernelService, IProtocolAdapter adapter, ILogger<MCPGrpcServiceImpl> logger) { _kernelService = kernelService; _adapter = adapter; _logger = logger; } public override async Task<MCPGrpcResponse> SendCommand( MCPRequest request, ServerCallContext context) { _logger.LogInformation( "Recebido comando gRPC: {Command} de {Peer}", request.Command, context.Peer); // Converter para modelo unificado var command = _adapter.FromGrpcRequest(request); command = command with { SessionId = context.Peer }; // Processar via kernel unificado var response = await _kernelService.ExecuteCommandAsync( command, context.CancellationToken); // Converter resposta return _adapter.ToGrpcResponse(response); } public override async Task StreamCommand( MCPRequest request, IServerStreamWriter<MCPStreamResponse> responseStream, ServerCallContext context) { _logger.LogInformation( "Iniciando streaming gRPC: {Command}", request.Command); var command = _adapter.FromGrpcRequest(request); int chunkIndex = 0; await foreach (var chunk in _kernelService.StreamCommandAsync( command, context.CancellationToken)) { await responseStream.WriteAsync(new MCPStreamResponse { Content = chunk, ChunkIndex = chunkIndex++, IsComplete = false }); } // Enviar chunk final await responseStream.WriteAsync(new MCPStreamResponse { Content = "Stream concluído", ChunkIndex = chunkIndex, IsComplete = true }); } public override Task<HealthResponse> HealthCheck( HealthRequest request, ServerCallContext context) { return Task.FromResult(new HealthResponse { Status = "Healthy", Version = "1.0.0", Protocol = "gRPC" }); } } // Program.cs var builder = WebApplication.CreateBuilder(args); builder.Services.AddGrpc(); builder.Services.AddSingleton<IMCPKernelService, MCPKernelService>(); builder.Services.AddSingleton<IProtocolAdapter, ProtocolAdapter>(); // Redis para event bus builder.Services.AddSingleton<IConnectionMultiplexer>( ConnectionMultiplexer.Connect("localhost:6379")); builder.Services.AddSingleton<IEventBus, RedisEventBus>(); var app = builder.Build(); app.MapGrpcService<MCPGrpcServiceImpl>(); app.MapGet("/", () => "MCP gRPC Service"); app.Run(); 
Enter fullscreen mode Exit fullscreen mode

7️⃣ WebSocket Service Endpoint

// MCPPipeline.WebSocketService/Hubs/MCPHub.cs using Microsoft.AspNetCore.SignalR; using MCPPipeline.Contracts.Messages; using MCPPipeline.Core.Services; namespace MCPPipeline.WebSocketService.Hubs; public class MCPHub : Hub { private readonly IMCPKernelService _kernelService; private readonly ILogger<MCPHub> _logger; public MCPHub( IMCPKernelService kernelService, ILogger<MCPHub> logger) { _kernelService = kernelService; _logger = logger; } public override async Task OnConnectedAsync() { _logger.LogInformation("Cliente WebSocket conectado: {ConnectionId}", Context.ConnectionId); await Clients.Caller.SendAsync("Connected", new { ConnectionId = Context.ConnectionId, Protocol = "WebSocket", Message = "Conectado ao MCP Hub" }); await base.OnConnectedAsync(); } public async Task SendCommand(MCPCommand command) { _logger.LogInformation( "Recebido comando WebSocket: {Command} de {ConnectionId}", command.Command, Context.ConnectionId); // Enriquecer comando com dados do WebSocket var enrichedCommand = command with { Protocol = ProtocolType.WebSocket, SessionId = Context.ConnectionId }; // Processar via kernel unificado var response = await _kernelService.ExecuteCommandAsync( enrichedCommand, Context.ConnectionAborted); // Enviar resposta await Clients.Caller.SendAsync("CommandResponse", response); } public async Task StreamCommand(MCPCommand command) { _logger.LogInformation( "Iniciando streaming WebSocket: {Command}", command.Command); var enrichedCommand = command with { Protocol = ProtocolType.WebSocket, SessionId = Context.ConnectionId }; await foreach (var chunk in _kernelService.StreamCommandAsync( enrichedCommand, Context.ConnectionAborted)) { await Clients.Caller.SendAsync("StreamChunk", new { Content = chunk, Timestamp = DateTime.UtcNow }); } await Clients.Caller.SendAsync("StreamComplete", new { Message = "Streaming concluído" }); } } // Program.cs var builder = WebApplication.CreateBuilder(args); builder.Services.AddSignalR(); builder.Services.AddSingleton<IMCPKernelService, MCPKernelService>(); // Redis para event bus builder.Services.AddSingleton<IConnectionMultiplexer>( ConnectionMultiplexer.Connect("localhost:6379")); builder.Services.AddSingleton<IEventBus, RedisEventBus>(); builder.Services.AddCors(options => { options.AddPolicy("AllowAll", policy => policy.AllowAnyOrigin().AllowAnyMethod().AllowAnyHeader()); }); var app = builder.Build(); app.UseCors("AllowAll"); app.MapHub<MCPHub>("/mcphub"); app.MapGet("/", () => "MCP WebSocket Service"); app.Run(); 
Enter fullscreen mode Exit fullscreen mode

8️⃣ API Gateway com YARP

// MCPPipeline.Gateway/Program.cs var builder = WebApplication.CreateBuilder(args); builder.Services.AddReverseProxy() .LoadFromConfig(builder.Configuration.GetSection("ReverseProxy")); var app = builder.Build(); app.MapReverseProxy(); app.MapGet("/", () => Results.Ok(new { Service = "MCP Gateway", Endpoints = new { gRPC = "https://localhost:5001", WebSocket = "https://localhost:5002/mcphub" } })); app.Run(); // appsettings.json { "ReverseProxy": { "Routes": { "grpc-route": { "ClusterId": "grpc-cluster", "Match": { "Path": "/grpc/{**catch-all}" } }, "websocket-route": { "ClusterId": "websocket-cluster", "Match": { "Path": "/ws/{**catch-all}" } } }, "Clusters": { "grpc-cluster": { "Destinations": { "destination1": { "Address": "https://localhost:5001" } } }, "websocket-cluster": { "Destinations": { "destination1": { "Address": "https://localhost:5002" } }, "HttpRequest": { "Version": "1.1", "VersionPolicy": "RequestVersionOrLower" } } } } } 
Enter fullscreen mode Exit fullscreen mode

9️⃣ Cliente Unificado

// MCPPipeline.Client/UnifiedMCPClient.cs public class UnifiedMCPClient : IAsyncDisposable { private readonly MCPService.MCPServiceClient? _grpcClient; private readonly HubConnection? _hubConnection; private readonly ILogger<UnifiedMCPClient> _logger; private readonly bool _useGrpc; public UnifiedMCPClient( string endpoint, bool useGrpc, ILogger<UnifiedMCPClient> logger) { _useGrpc = useGrpc; _logger = logger; if (useGrpc) { var channel = GrpcChannel.ForAddress(endpoint); _grpcClient = new MCPService.MCPServiceClient(channel); _logger.LogInformation("Cliente configurado para gRPC: {Endpoint}", endpoint); } else { _hubConnection = new HubConnectionBuilder() .WithUrl(endpoint) .WithAutomaticReconnect() .Build(); SetupWebSocketHandlers(); _logger.LogInformation("Cliente configurado para WebSocket: {Endpoint}", endpoint); } } public async Task ConnectAsync(CancellationToken ct = default) { if (!_useGrpc && _hubConnection != null) { await _hubConnection.StartAsync(ct); _logger.LogInformation("Conectado via WebSocket"); } } public async Task<MCPResponse> SendCommandAsync( string command, string payload, CancellationToken ct = default) { if (_useGrpc && _grpcClient != null) { return await SendViaGrpcAsync(command, payload, ct); } else if (_hubConnection != null) { return await SendViaWebSocketAsync(command, payload, ct); } throw new InvalidOperationException("Cliente não inicializado"); } private async Task<MCPResponse> SendViaGrpcAsync( string command, string payload, CancellationToken ct) { _logger.LogInformation("Enviando comando via gRPC: {Command}", command); var request = new MCPRequest { Command = command, Payload = payload, Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() }; var response = await _grpcClient!.SendCommandAsync(request, cancellationToken: ct); return new MCPResponse { Result = response.Result, Status = response.Status == "OK" ? ResponseStatus.Success : ResponseStatus.Error, ErrorMessage = response.ErrorMessage, ProcessingTimeMs = response.ProcessingTimeMs }; } private async Task<MCPResponse> SendViaWebSocketAsync( string command, string payload, CancellationToken ct) { _logger.LogInformation("Enviando comando via WebSocket: {Command}", command); var tcs = new TaskCompletionSource<MCPResponse>(); void handler(MCPResponse response) { tcs.TrySetResult(response); } _hubConnection!.On<MCPResponse>("CommandResponse", handler); try { await _hubConnection.InvokeAsync("SendCommand", new MCPCommand { Command = command, Payload = payload, Timestamp = DateTime.UtcNow }, ct); return await tcs.Task.WaitAsync(TimeSpan.FromSeconds(30), ct); } finally { _hubConnection.Remove("CommandResponse"); } } public async IAsyncEnumerable<string> StreamCommandAsync( string command, string payload, [EnumeratorCancellation] CancellationToken ct = default) { if (_useGrpc && _grpcClient != null) { await foreach (var chunk in StreamViaGrpcAsync(command, payload, ct)) { yield return chunk; } } else if (_hubConnection != null) { await foreach (var chunk in StreamViaWebSocketAsync(command, payload, ct)) { yield return chunk; } } } private async IAsyncEnumerable<string> StreamViaGrpcAsync( string command, string payload, [EnumeratorCancellation] CancellationToken ct) { var request = new MCPRequest { Command = command, Payload = payload, Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() }; using var call = _grpcClient!.StreamCommand(request, cancellationToken: ct); await foreach (var response in call.ResponseStream.ReadAllAsync(ct)) { yield return response.Content; } } private async IAsyncEnumerable<string> StreamViaWebSocketAsync( string command, string payload, [EnumeratorCancellation] CancellationToken ct) { var channel = Channel.CreateUnbounded<string>(); _hubConnection!.On<dynamic>("StreamChunk", (chunk) => { channel.Writer.TryWrite(chunk.Content.ToString()); }); _hubConnection.On<dynamic>("StreamComplete", (_) => { channel.Writer.Complete(); }); await _hubConnection.InvokeAsync("StreamCommand", new MCPCommand { Command = command, Payload = payload, Timestamp = DateTime.UtcNow }, ct); await foreach (var chunk in channel.Reader.ReadAllAsync(ct)) { yield return chunk; } } private void SetupWebSocketHandlers() { _hubConnection!.On<object>("Connected", data => { _logger.LogInformation("✅ WebSocket conectado: {Data}", data); }); _hubConnection.Reconnecting += error => { _logger.LogWarning("⚠️ WebSocket reconectando: {Error}", error?.Message); return Task.CompletedTask; }; _hubConnection.Reconnected += connectionId => { _logger.LogInformation("✅ WebSocket reconectado: {ConnectionId}", connectionId); return Task.CompletedTask; }; _hubConnection.Closed += error => { _logger.LogError("❌ WebSocket fechado: {Error}", error?.Message); return Task.CompletedTask; }; } public async ValueTask DisposeAsync() { if (_hubConnection != null) { await _hubConnection.StopAsync(); await _hubConnection.DisposeAsync(); } } } // Exemplo de uso public class Program { public static async Task Main(string[] args) { var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole()); var logger = loggerFactory.CreateLogger<UnifiedMCPClient>(); // Cliente gRPC await using var grpcClient = new UnifiedMCPClient( "https://localhost:5001", useGrpc: true, logger); var response = await grpcClient.SendCommandAsync("analyze", "Teste via gRPC"); Console.WriteLine($"gRPC: {response.Result}"); // Cliente WebSocket await using var wsClient = new UnifiedMCPClient( "https://localhost:5002/mcphub", useGrpc: false, logger); await wsClient.ConnectAsync(); response = await wsClient.SendCommandAsync("analyze", "Teste via WebSocket"); Console.WriteLine($"WebSocket: {response.Result}"); // Streaming await foreach (var chunk in wsClient.StreamCommandAsync("generate", "Prompt de teste")) { Console.WriteLine($"Chunk: {chunk}"); } } } 
Enter fullscreen mode Exit fullscreen mode

🔍 Observabilidade Unificada

OpenTelemetry Configuration

// MCPPipeline.Observability/OpenTelemetryExtensions.cs using OpenTelemetry.Resources; using OpenTelemetry.Trace; using OpenTelemetry.Metrics; public static class OpenTelemetryExtensions { public static IServiceCollection AddMCPObservability( this IServiceCollection services, string serviceName) { services.AddOpenTelemetry() .ConfigureResource(resource => { resource.AddService( serviceName: serviceName, serviceVersion: "1.0.0"); }) .WithTracing(tracing => { tracing .AddAspNetCoreInstrumentation(options => { options.RecordException = true; options.EnrichWithHttpRequest = (activity, request) => { activity.SetTag("http.scheme", request.Scheme); activity.SetTag("http.client_ip", request.HttpContext.Connection.RemoteIpAddress); }; }) .AddGrpcClientInstrumentation() .AddSource("MCPPipeline.Core") .AddSource("MCPPipeline.GrpcService") .AddSource("MCPPipeline.WebSocketService") .AddOtlpExporter(options => { options.Endpoint = new Uri("http://localhost:4317"); }); }) .WithMetrics(metrics => { metrics .AddAspNetCoreInstrumentation() .AddRuntimeInstrumentation() .AddMeter("MCPPipeline.*") .AddPrometheusExporter() .AddOtlpExporter(); }); return services; } } // Uso nos serviços builder.Services.AddMCPObservability("MCPPipeline.GrpcService"); 
Enter fullscreen mode Exit fullscreen mode

Métricas Customizadas

// MCPPipeline.Observability/MCPMetrics.cs using System.Diagnostics.Metrics; public class MCPMetrics { private readonly Meter _meter; private readonly Counter<long> _commandsProcessed; private readonly Histogram<double> _commandDuration; private readonly Counter<long> _commandErrors; private readonly UpDownCounter<int> _activeConnections; public MCPMetrics() { _meter = new Meter("MCPPipeline.Metrics", "1.0.0"); _commandsProcessed = _meter.CreateCounter<long>( "mcp.commands.processed", description: "Total de comandos processados"); _commandDuration = _meter.CreateHistogram<double>( "mcp.command.duration", unit: "ms", description: "Duração do processamento de comandos"); _commandErrors = _meter.CreateCounter<long>( "mcp.commands.errors", description: "Total de erros no processamento"); _activeConnections = _meter.CreateUpDownCounter<int>( "mcp.connections.active", description: "Conexões ativas"); } public void RecordCommandProcessed(string command, ProtocolType protocol) { _commandsProcessed.Add(1, new KeyValuePair<string, object?>("command", command), new KeyValuePair<string, object?>("protocol", protocol.ToString())); } public void RecordCommandDuration(string command, ProtocolType protocol, double durationMs) { _commandDuration.Record(durationMs, new KeyValuePair<string, object?>("command", command), new KeyValuePair<string, object?>("protocol", protocol.ToString())); } public void RecordCommandError(string command, ProtocolType protocol, string errorType) { _commandErrors.Add(1, new KeyValuePair<string, object?>("command", command), new KeyValuePair<string, object?>("protocol", protocol.ToString()), new KeyValuePair<string, object?>("error_type", errorType)); } public void IncrementActiveConnections(ProtocolType protocol) { _activeConnections.Add(1, new KeyValuePair<string, object?>("protocol", protocol.ToString())); } public void DecrementActiveConnections(ProtocolType protocol) { _activeConnections.Add(-1, new KeyValuePair<string, object?>("protocol", protocol.ToString())); } } 
Enter fullscreen mode Exit fullscreen mode

📊 Dashboard com Grafana

Prometheus Configuration

# prometheus.yml global: scrape_interval: 15s evaluation_interval: 15s scrape_configs: - job_name: 'mcp-grpc-service' static_configs: - targets: ['localhost:5001'] - job_name: 'mcp-websocket-service' static_configs: - targets: ['localhost:5002'] - job_name: 'mcp-gateway' static_configs: - targets: ['localhost:5000'] 
Enter fullscreen mode Exit fullscreen mode

Grafana Dashboard JSON

{ "dashboard": { "title": "MCP Hybrid Protocol Dashboard", "panels": [ { "title": "Comandos Processados por Protocolo", "targets": [ { "expr": "rate(mcp_commands_processed_total[5m])", "legendFormat": "{{protocol}}" } ] }, { "title": "Latência Média por Comando", "targets": [ { "expr": "histogram_quantile(0.95, rate(mcp_command_duration_bucket[5m]))", "legendFormat": "p95 - {{command}}" } ] }, { "title": "Conexões Ativas", "targets": [ { "expr": "mcp_connections_active", "legendFormat": "{{protocol}}" } ] }, { "title": "Taxa de Erro", "targets": [ { "expr": "rate(mcp_commands_errors_total[5m])", "legendFormat": "{{protocol}} - {{error_type}}" } ] } ] } } 
Enter fullscreen mode Exit fullscreen mode

🧪 Testes de Integração

Teste Híbrido

// MCPPipeline.IntegrationTests/HybridProtocolTests.cs public class HybridProtocolTests : IClassFixture<MCPTestFixture> { private readonly MCPTestFixture _fixture; public HybridProtocolTests(MCPTestFixture fixture) { _fixture = fixture; } [Fact] public async Task Should_Process_Same_Command_Via_Both_Protocols() { // Arrange var grpcClient = _fixture.CreateGrpcClient(); var wsClient = _fixture.CreateWebSocketClient(); await wsClient.ConnectAsync(); var testPayload = "Teste de integração híbrida"; // Act var grpcResponse = await grpcClient.SendCommandAsync("analyze", testPayload); var wsResponse = await wsClient.SendCommandAsync("analyze", testPayload); // Assert Assert.Equal(ResponseStatus.Success, grpcResponse.Status); Assert.Equal(ResponseStatus.Success, wsResponse.Status); // Ambos devem retornar resultados equivalentes Assert.Contains("palavras", grpcResponse.Result); Assert.Contains("palavras", wsResponse.Result); } [Fact] public async Task Should_Handle_Concurrent_Requests_From_Both_Protocols() { // Arrange var grpcClient = _fixture.CreateGrpcClient(); var wsClient = _fixture.CreateWebSocketClient(); await wsClient.ConnectAsync(); var tasks = new List<Task<MCPResponse>>(); // Act - Enviar 50 requisições concorrentes de cada protocolo for (int i = 0; i < 50; i++) { tasks.Add(grpcClient.SendCommandAsync("status", $"gRPC-{i}")); tasks.Add(wsClient.SendCommandAsync("status", $"WS-{i}")); } var responses = await Task.WhenAll(tasks); // Assert Assert.All(responses, r => Assert.Equal(ResponseStatus.Success, r.Status)); Assert.Equal(100, responses.Length); } [Fact] public async Task Should_Broadcast_Events_Between_Protocols() { // Arrange var grpcClient = _fixture.CreateGrpcClient(); var wsClient = _fixture.CreateWebSocketClient(); await wsClient.ConnectAsync(); var eventReceived = new TaskCompletionSource<bool>(); // Configurar listener no WebSocket // (implementação depende do event bus) // Act - Enviar comando via gRPC await grpcClient.SendCommandAsync("generate", "Teste de evento"); // Assert - WebSocket deve receber notificação var received = await eventReceived.Task.WaitAsync(TimeSpan.FromSeconds(5)); Assert.True(received); } } public class MCPTestFixture : IAsyncLifetime { private WebApplication? _grpcApp; private WebApplication? _wsApp; public async Task InitializeAsync() { // Inicializar serviços de teste _grpcApp = CreateGrpcService(); _wsApp = CreateWebSocketService(); await _grpcApp.StartAsync(); await _wsApp.StartAsync(); } public async Task DisposeAsync() { if (_grpcApp != null) await _grpcApp.StopAsync(); if (_wsApp != null) await _wsApp.StopAsync(); } public UnifiedMCPClient CreateGrpcClient() { return new UnifiedMCPClient( "https://localhost:5001", useGrpc: true, Mock.Of<ILogger<UnifiedMCPClient>>()); } public UnifiedMCPClient CreateWebSocketClient() { return new UnifiedMCPClient( "https://localhost:5002/mcphub", useGrpc: false, Mock.Of<ILogger<UnifiedMCPClient>>()); } private WebApplication CreateGrpcService() { var builder = WebApplication.CreateBuilder(); builder.Services.AddGrpc(); builder.Services.AddSingleton<IMCPKernelService, MCPKernelService>(); builder.Services.AddSingleton<IProtocolAdapter, ProtocolAdapter>(); var app = builder.Build(); app.MapGrpcService<MCPGrpcServiceImpl>(); return app; } private WebApplication CreateWebSocketService() { var builder = WebApplication.CreateBuilder(); builder.Services.AddSignalR(); builder.Services.AddSingleton<IMCPKernelService, MCPKernelService>(); var app = builder.Build(); app.MapHub<MCPHub>("/mcphub"); return app; } } 
Enter fullscreen mode Exit fullscreen mode

🚀 Teste de Performance

Benchmark com BenchmarkDotNet

// MCPPipeline.Benchmarks/ProtocolBenchmarks.cs using BenchmarkDotNet.Attributes; using BenchmarkDotNet.Running; [MemoryDiagnoser] [SimpleJob(warmupCount: 3, iterationCount: 10)] public class ProtocolBenchmarks { private UnifiedMCPClient _grpcClient = null!; private UnifiedMCPClient _wsClient = null!; [GlobalSetup] public async Task Setup() { _grpcClient = new UnifiedMCPClient( "https://localhost:5001", useGrpc: true, Mock.Of<ILogger<UnifiedMCPClient>>()); _wsClient = new UnifiedMCPClient( "https://localhost:5002/mcphub", useGrpc: false, Mock.Of<ILogger<UnifiedMCPClient>>()); await _wsClient.ConnectAsync(); } [Benchmark(Baseline = true)] public async Task<MCPResponse> GrpcCommand() { return await _grpcClient.SendCommandAsync("status", "benchmark"); } [Benchmark] public async Task<MCPResponse> WebSocketCommand() { return await _wsClient.SendCommandAsync("status", "benchmark"); } [Benchmark] public async Task GrpcStreaming() { await foreach (var _ in _grpcClient.StreamCommandAsync("generate", "test")) { // Consumir stream } } [Benchmark] public async Task WebSocketStreaming() { await foreach (var _ in _wsClient.StreamCommandAsync("generate", "test")) { // Consumir stream } } [GlobalCleanup] public async Task Cleanup() { await _grpcClient.DisposeAsync(); await _wsClient.DisposeAsync(); } } // Program.cs public class Program { public static void Main(string[] args) { BenchmarkRunner.Run<ProtocolBenchmarks>(); } } 
Enter fullscreen mode Exit fullscreen mode

Resultados Esperados

| Method | Mean | Error | StdDev | Ratio | Gen0 | Allocated | |------------------ |----------:|----------:|----------:|------:|-----:|----------:| | GrpcCommand | 2.450 ms | 0.0421 ms | 0.0394 ms | 1.00 | 15.6 | 128KB | | WebSocketCommand | 8.732 ms | 0.1523 ms | 0.1425 ms | 3.56 | 31.2 | 256KB | | GrpcStreaming | 12.105 ms | 0.2103 ms | 0.1967 ms | 4.94 | 46.8 | 384KB | | WebSocketStreaming| 18.421 ms | 0.3142 ms | 0.2940 ms | 7.52 | 62.5 | 512KB | 
Enter fullscreen mode Exit fullscreen mode

🎯 Decisão de Protocolo: Quando Usar Cada Um

Matriz de Decisão

public class ProtocolSelector { public ProtocolType SelectOptimalProtocol(ClientContext context) { // 1. Cliente é navegador? → WebSocket if (context.IsWebBrowser) return ProtocolType.WebSocket; // 2. Comunicação interna entre serviços? → gRPC if (context.IsInternalService) return ProtocolType.Grpc; // 3. Requer streaming bidirecional de longa duração? → WebSocket if (context.RequiresLongLivedStream) return ProtocolType.WebSocket; // 4. Prioriza latência mínima? → gRPC if (context.LatencySensitive) return ProtocolType.Grpc; // 5. Múltiplos clientes precisam receber broadcasts? → WebSocket if (context.RequiresBroadcast) return ProtocolType.WebSocket; // 6. Padrão: gRPC para performance return ProtocolType.Grpc; } } public record ClientContext { public bool IsWebBrowser { get; init; } public bool IsInternalService { get; init; } public bool RequiresLongLivedStream { get; init; } public bool LatencySensitive { get; init; } public bool RequiresBroadcast { get; init; } } 
Enter fullscreen mode Exit fullscreen mode

Tabela Comparativa

Cenário gRPC WebSocket Recomendação
API Backend → Backend ✅ Ideal ⚠️ Overhead gRPC
Aplicação Web → Backend ❌ Limitado ✅ Nativo WebSocket
Mobile App → Backend ✅ Excelente ✅ Bom gRPC (melhor performance)
Dashboard Real-time ⚠️ Polling ✅ Push nativo WebSocket
Chat Multi-usuário ❌ Não ideal ✅ Perfeito WebSocket
Análise de Dados ✅ Alta performance ⚠️ Mais lento gRPC
Streaming de IA ✅ Eficiente ✅ Funcional Ambos (contexto dependente)

🔒 Segurança em Ambiente Híbrido

Autenticação Unificada

// MCPPipeline.Security/UnifiedAuthenticationExtensions.cs public static class UnifiedAuthenticationExtensions { public static IServiceCollection AddUnifiedAuthentication( this IServiceCollection services, IConfiguration configuration) { services.AddAuthentication(options => { options.DefaultAuthenticateScheme = JwtBearerDefaults.AuthenticationScheme; options.DefaultChallengeScheme = JwtBearerDefaults.AuthenticationScheme; }) .AddJwtBearer(options => { options.TokenValidationParameters = new TokenValidationParameters { ValidateIssuer = true, ValidateAudience = true, ValidateLifetime = true, ValidateIssuerSigningKey = true, ValidIssuer = configuration["Jwt:Issuer"], ValidAudience = configuration["Jwt:Audience"], IssuerSigningKey = new SymmetricSecurityKey( Encoding.UTF8.GetBytes(configuration["Jwt:Key"]!)) }; // Suporte para WebSocket token via query string options.Events = new JwtBearerEvents { OnMessageReceived = context => { var accessToken = context.Request.Query["access_token"]; var path = context.HttpContext.Request.Path; if (!string.IsNullOrEmpty(accessToken) && path.StartsWithSegments("/mcphub")) { context.Token = accessToken; } return Task.CompletedTask; } }; }); // Adicionar metadata para gRPC services.AddGrpc(options => { options.Interceptors.Add<AuthenticationInterceptor>(); }); return services; } } // Interceptor para gRPC public class AuthenticationInterceptor : Interceptor { private readonly ILogger<AuthenticationInterceptor> _logger; public AuthenticationInterceptor(ILogger<AuthenticationInterceptor> logger) { _logger = logger; } public override async Task<TResponse> UnaryServerHandler<TRequest, TResponse>( TRequest request, ServerCallContext context, UnaryServerMethod<TRequest, TResponse> continuation) { var authHeader = context.RequestHeaders.GetValue("authorization"); if (string.IsNullOrEmpty(authHeader)) { _logger.LogWarning("Requisição gRPC sem autenticação"); throw new RpcException(new Status(StatusCode.Unauthenticated, "Token não fornecido")); } // TODO: Implementar validação real do token JWT _logger.LogInformation("Token gRPC validado para peer: {Peer}", context.Peer); return await continuation(request, context); } } 
Enter fullscreen mode Exit fullscreen mode

✅ Conclusão e Próximos Passos

Nesta quarta parte, consolidamos uma arquitetura híbrida que combina gRPC e WebSocket, garantindo:

  • Baixa latência e tipagem forte para comunicação interna (gRPC)
  • Flexibilidade e real-time para clientes web/mobile (WebSocket)
  • Kernel unificado, observabilidade centralizada e segurança robusta

🔮 Próximos Passos

  • Parte 5: Integração com mensageria avançada (Kafka, RabbitMQ) para cenários de alta escala.
  • Parte 6: Implementação de circuit breaker e resiliente com Polly.
  • Parte 7: Estratégias de multi-cloud deployment e CI/CD com GitHub Actions.

Dica: Explore o repositório MCPPipeline.Hybrid para exemplos completos e scripts de automação.


🤝 Conecte-se Comigo

Se você trabalha com .NET moderno e quer dominar arquitetura, C#, observabilidade, DevOps ou interoperabilidade:

💼 LinkedIn
✍️ Medium
📬 contato@dopme.io
📬 devsfree@devsfree.com.br

⁸ Novamente o transportou o diabo a um monte muito alto; e mostrou-lhe todos os reinos do mundo, e a glória deles. ⁹ E disse-lhe: Tudo isto te darei se, prostrado, me adorares. ¹⁰ Então disse-lhe Jesus: Vai-te, Satanás, porque está escrito: Ao Senhor teu Deus adorarás, e só a ele servirás. Mateus 4:8-10

Top comments (0)