Integração Completa com WebSocket
Nesta terceira parte da série "Minha Primeira Comunicação com MCP e .NET", exploramos como implementar uma integração bidirecional em tempo real com WebSocket, permitindo que o MCP (Model Context Protocol) mantenha conexões persistentes e troque mensagens com aplicações .NET de forma eficiente e responsiva.
🚀 Introdução
Enquanto o gRPC (abordado na Parte 2) é excelente para comunicação servidor-a-servidor, o WebSocket brilha em cenários onde é necessário manter conexões persistentes bidirecionais com clientes diversos, incluindo navegadores web, aplicações móveis e agentes distribuídos.
O WebSocket é o protocolo ideal quando o MCP precisa:
- Enviar atualizações em tempo real sem polling
- Manter estado de conexão com múltiplos clientes simultâneos
- Suportar comunicação full-duplex com baixa latência
- Integrar-se nativamente com aplicações web frontend
Este artigo demonstra como criar uma arquitetura robusta de comunicação MCP via WebSocket no .NET 8, com foco em escalabilidade, resiliência e boas práticas.
⚙️ O que é WebSocket e quando usá-lo?
WebSocket é um protocolo de comunicação que fornece canais full-duplex sobre uma única conexão TCP. Diferente do HTTP tradicional, após o handshake inicial, a conexão permanece aberta, permitindo troca de mensagens em ambas as direções sem overhead de múltiplas requisições.
Comparação: WebSocket vs HTTP vs gRPC
| Característica | WebSocket | HTTP/REST | gRPC |
|---|---|---|---|
| Conexão | Persistente | Stateless | Persistente |
| Direção | Bidirecional | Request/Response | Bidirecional |
| Overhead | Baixo | Alto | Muito Baixo |
| Suporte Web | ✅ Nativo | ✅ Nativo | ⚠️ Limitado |
| Tipagem | Flexível | Flexível | Forte (Protobuf) |
| Use Case | Real-time, Dashboards | APIs públicas | Microsserviços |
Quando usar WebSocket com MCP:
✅ Dashboards em tempo real - Monitoramento de agentes MCP
✅ Chat e assistentes - Comunicação interativa com LLMs
✅ Streaming de resultados - Respostas progressivas de IA
✅ Notificações push - Alertas e eventos de sistema
✅ Sincronização multi-cliente - Estado compartilhado entre sessões
🧠 Arquitetura da Integração MCP + WebSocket + .NET
A arquitetura utiliza SignalR (camada de abstração sobre WebSocket) para gerenciar conexões e broadcasting:
┌──────────────┐ ┌─────────────────┐ ┌──────────────┐ │ │ │ │ │ │ │ MCP Agent │ ◄─────► │ SignalR Hub │ ◄─────► │ MCP Kernel │ │ (Cliente) │ WS │ (.NET 8) │ │ Orchestrator│ │ │ │ │ │ │ └──────────────┘ └─────────────────┘ └──────────────┘ ▲ │ ▲ │ │ │ │ ┌──────▼──────┐ │ │ │ │ │ └──────────────────┤ Connection │──────────────────┘ │ Manager │ │ + State │ └─────────────┘ Componentes principais:
- SignalR Hub - Gerencia conexões, roteamento e broadcasting
- Connection Manager - Rastreia clientes ativos e metadados
- MCP Kernel Orchestrator - Processa comandos e retorna resultados
- State Manager - Mantém contexto de sessão por conexão
🏗️ Implementação Passo a Passo
1️⃣ Criar o Projeto Base
dotnet new web -n MCPPipeline.WebSocket cd MCPPipeline.WebSocket dotnet add package Microsoft.AspNetCore.SignalR Estrutura do projeto:
MCPPipeline.WebSocket/ ├── Hubs/ │ └── MCPHub.cs ├── Services/ │ ├── IMCPKernelService.cs │ ├── MCPKernelService.cs │ └── ConnectionManager.cs ├── Models/ │ ├── MCPMessage.cs │ └── MCPResponse.cs ├── Program.cs └── appsettings.json 2️⃣ Definir Modelos de Mensagem
namespace MCPPipeline.WebSocket.Models; public record MCPMessage { public string Command { get; init; } = string.Empty; public string Payload { get; init; } = string.Empty; public Dictionary<string, string> Metadata { get; init; } = new(); public DateTime Timestamp { get; init; } = DateTime.UtcNow; public string SessionId { get; init; } = string.Empty; } public record MCPResponse { public string Result { get; init; } = string.Empty; public string Status { get; init; } = string.Empty; public string? ErrorMessage { get; init; } public long ProcessingTimeMs { get; init; } public DateTime Timestamp { get; init; } = DateTime.UtcNow; } public record MCPStreamChunk { public string Content { get; init; } = string.Empty; public int ChunkIndex { get; init; } public bool IsComplete { get; init; } public string SessionId { get; init; } = string.Empty; } public enum MCPEventType { CommandReceived, CommandProcessing, CommandCompleted, CommandFailed, AgentConnected, AgentDisconnected } public record MCPEvent { public MCPEventType Type { get; init; } public string Message { get; init; } = string.Empty; public DateTime Timestamp { get; init; } = DateTime.UtcNow; public Dictionary<string, object> Data { get; init; } = new(); } 3️⃣ Implementar o Connection Manager
namespace MCPPipeline.WebSocket.Services; public interface IConnectionManager { void AddConnection(string connectionId, string userId); void RemoveConnection(string connectionId); IEnumerable<string> GetUserConnections(string userId); IEnumerable<string> GetAllConnections(); int GetConnectionCount(); Dictionary<string, object> GetConnectionMetadata(string connectionId); } public class ConnectionManager : IConnectionManager { private readonly ConcurrentDictionary<string, ConnectionInfo> _connections = new(); private readonly ILogger<ConnectionManager> _logger; public ConnectionManager(ILogger<ConnectionManager> logger) { _logger = logger; } public void AddConnection(string connectionId, string userId) { var info = new ConnectionInfo { ConnectionId = connectionId, UserId = userId, ConnectedAt = DateTime.UtcNow, LastActivity = DateTime.UtcNow }; _connections.TryAdd(connectionId, info); _logger.LogInformation("Conexão adicionada: {ConnectionId} | User: {UserId}", connectionId, userId); } public void RemoveConnection(string connectionId) { if (_connections.TryRemove(connectionId, out var info)) { var duration = DateTime.UtcNow - info.ConnectedAt; _logger.LogInformation( "Conexão removida: {ConnectionId} | Duração: {Duration}s", connectionId, duration.TotalSeconds); } } public IEnumerable<string> GetUserConnections(string userId) { return _connections.Values .Where(c => c.UserId == userId) .Select(c => c.ConnectionId); } public IEnumerable<string> GetAllConnections() { return _connections.Keys; } public int GetConnectionCount() => _connections.Count; public Dictionary<string, object> GetConnectionMetadata(string connectionId) { if (_connections.TryGetValue(connectionId, out var info)) { return new Dictionary<string, object> { ["userId"] = info.UserId, ["connectedAt"] = info.ConnectedAt, ["lastActivity"] = info.LastActivity, ["duration"] = (DateTime.UtcNow - info.ConnectedAt).TotalSeconds }; } return new Dictionary<string, object>(); } private class ConnectionInfo { public string ConnectionId { get; init; } = string.Empty; public string UserId { get; init; } = string.Empty; public DateTime ConnectedAt { get; init; } public DateTime LastActivity { get; set; } } } 4️⃣ Implementar o MCP Kernel Service
namespace MCPPipeline.WebSocket.Services; public interface IMCPKernelService { Task<MCPResponse> ExecuteCommandAsync(MCPMessage message, CancellationToken ct); IAsyncEnumerable<MCPStreamChunk> StreamResponseAsync( MCPMessage message, CancellationToken ct); } public class MCPKernelService : IMCPKernelService { private readonly ILogger<MCPKernelService> _logger; public MCPKernelService(ILogger<MCPKernelService> logger) { _logger = logger; } public async Task<MCPResponse> ExecuteCommandAsync( MCPMessage message, CancellationToken ct) { var sw = Stopwatch.StartNew(); try { _logger.LogInformation( "Executando comando: {Command} | Session: {SessionId}", message.Command, message.SessionId); var result = message.Command switch { "analyze" => await AnalyzeContextAsync(message.Payload, ct), "summarize" => await SummarizeTextAsync(message.Payload, ct), "translate" => await TranslateAsync(message.Payload, ct), "generate" => await GenerateContentAsync(message.Payload, ct), "status" => await GetSystemStatusAsync(ct), _ => throw new InvalidOperationException( $"Comando desconhecido: {message.Command}") }; sw.Stop(); return new MCPResponse { Result = result, Status = "SUCCESS", ProcessingTimeMs = sw.ElapsedMilliseconds }; } catch (Exception ex) { _logger.LogError(ex, "Erro ao executar comando: {Command}", message.Command); sw.Stop(); return new MCPResponse { Status = "ERROR", ErrorMessage = ex.Message, ProcessingTimeMs = sw.ElapsedMilliseconds }; } } public async IAsyncEnumerable<MCPStreamChunk> StreamResponseAsync( MCPMessage message, [EnumeratorCancellation] CancellationToken ct) { _logger.LogInformation("Iniciando streaming para: {Command}", message.Command); var chunks = await SimulateStreamingResponseAsync(message.Payload, ct); for (int i = 0; i < chunks.Count; i++) { yield return new MCPStreamChunk { Content = chunks[i], ChunkIndex = i, IsComplete = i == chunks.Count - 1, SessionId = message.SessionId }; await Task.Delay(100, ct); // Simula latência de streaming } } private async Task<string> AnalyzeContextAsync(string payload, CancellationToken ct) { await Task.Delay(200, ct); var words = payload.Split(' ').Length; return $"Análise concluída: {words} palavras, {payload.Length} caracteres"; } private async Task<string> SummarizeTextAsync(string payload, CancellationToken ct) { await Task.Delay(500, ct); var summary = payload.Length > 100 ? payload[..100] + "..." : payload; return $"Resumo: {summary}"; } private async Task<string> TranslateAsync(string payload, CancellationToken ct) { await Task.Delay(300, ct); return $"[Traduzido] {payload}"; } private async Task<string> GenerateContentAsync(string payload, CancellationToken ct) { await Task.Delay(1000, ct); return $"Conteúdo gerado baseado em: {payload}"; } private async Task<string> GetSystemStatusAsync(CancellationToken ct) { await Task.Delay(50, ct); return $"Sistema operacional | Timestamp: {DateTime.UtcNow:yyyy-MM-dd HH:mm:ss}"; } private async Task<List<string>> SimulateStreamingResponseAsync( string prompt, CancellationToken ct) { await Task.Delay(100, ct); return new List<string> { "Processando sua solicitação...", "Analisando contexto...", "Gerando resposta...", $"Resultado final para: {prompt}" }; } } 5️⃣ Implementar o SignalR Hub
using Microsoft.AspNetCore.SignalR; using MCPPipeline.WebSocket.Models; using MCPPipeline.WebSocket.Services; namespace MCPPipeline.WebSocket.Hubs; public class MCPHub : Hub { private readonly IMCPKernelService _kernelService; private readonly IConnectionManager _connectionManager; private readonly ILogger<MCPHub> _logger; public MCPHub( IMCPKernelService kernelService, IConnectionManager connectionManager, ILogger<MCPHub> logger) { _kernelService = kernelService; _connectionManager = connectionManager; _logger = logger; } public override async Task OnConnectedAsync() { var connectionId = Context.ConnectionId; var userId = Context.User?.Identity?.Name ?? "anonymous"; _connectionManager.AddConnection(connectionId, userId); await Clients.Caller.SendAsync("Connected", new { ConnectionId = connectionId, Message = "Conectado ao MCP WebSocket Server", Timestamp = DateTime.UtcNow }); // Notificar outros clientes await Clients.Others.SendAsync("AgentJoined", new MCPEvent { Type = MCPEventType.AgentConnected, Message = $"Novo agente conectado: {connectionId}", Data = new Dictionary<string, object> { ["connectionId"] = connectionId, ["userId"] = userId } }); _logger.LogInformation("Cliente conectado: {ConnectionId}", connectionId); await base.OnConnectedAsync(); } public override async Task OnDisconnectedAsync(Exception? exception) { var connectionId = Context.ConnectionId; _connectionManager.RemoveConnection(connectionId); await Clients.Others.SendAsync("AgentLeft", new MCPEvent { Type = MCPEventType.AgentDisconnected, Message = $"Agente desconectado: {connectionId}" }); if (exception != null) { _logger.LogError(exception, "Cliente desconectado com erro: {ConnectionId}", connectionId); } else { _logger.LogInformation("Cliente desconectado: {ConnectionId}", connectionId); } await base.OnDisconnectedAsync(exception); } public async Task SendCommand(MCPMessage message) { var connectionId = Context.ConnectionId; _logger.LogInformation( "Comando recebido: {Command} de {ConnectionId}", message.Command, connectionId); // Notificar que o comando foi recebido await Clients.Caller.SendAsync("CommandReceived", new MCPEvent { Type = MCPEventType.CommandReceived, Message = "Comando recebido e será processado", Data = new Dictionary<string, object> { ["command"] = message.Command, ["sessionId"] = message.SessionId } }); // Processar comando var response = await _kernelService.ExecuteCommandAsync( message with { SessionId = connectionId }, Context.ConnectionAborted); // Enviar resposta ao cliente await Clients.Caller.SendAsync("CommandResponse", response); // Log do evento await Clients.All.SendAsync("SystemEvent", new MCPEvent { Type = response.Status == "SUCCESS" ? MCPEventType.CommandCompleted : MCPEventType.CommandFailed, Message = $"Comando {message.Command} processado", Data = new Dictionary<string, object> { ["processingTime"] = response.ProcessingTimeMs, ["status"] = response.Status } }); } public async Task StreamCommand(MCPMessage message) { var connectionId = Context.ConnectionId; _logger.LogInformation( "Iniciando streaming: {Command} de {ConnectionId}", message.Command, connectionId); await foreach (var chunk in _kernelService.StreamResponseAsync( message with { SessionId = connectionId }, Context.ConnectionAborted)) { await Clients.Caller.SendAsync("StreamChunk", chunk); } await Clients.Caller.SendAsync("StreamComplete", new { SessionId = connectionId, Message = "Streaming concluído" }); } public async Task<object> GetServerStatus() { var connections = _connectionManager.GetConnectionCount(); return await Task.FromResult(new { Status = "Online", ActiveConnections = connections, Uptime = Environment.TickCount64 / 1000.0, Timestamp = DateTime.UtcNow }); } public async Task JoinGroup(string groupName) { await Groups.AddToGroupAsync(Context.ConnectionId, groupName); await Clients.Group(groupName).SendAsync("UserJoinedGroup", new { ConnectionId = Context.ConnectionId, GroupName = groupName }); _logger.LogInformation( "Cliente {ConnectionId} entrou no grupo {GroupName}", Context.ConnectionId, groupName); } public async Task LeaveGroup(string groupName) { await Groups.RemoveFromGroupAsync(Context.ConnectionId, groupName); _logger.LogInformation( "Cliente {ConnectionId} saiu do grupo {GroupName}", Context.ConnectionId, groupName); } public async Task BroadcastToGroup(string groupName, MCPMessage message) { await Clients.Group(groupName).SendAsync("GroupMessage", message); } } 6️⃣ Configurar o Program.cs
using MCPPipeline.WebSocket.Hubs; using MCPPipeline.WebSocket.Services; var builder = WebApplication.CreateBuilder(args); // Adicionar serviços builder.Services.AddSignalR(options => { options.EnableDetailedErrors = true; options.MaximumReceiveMessageSize = 1024 * 1024; // 1MB options.KeepAliveInterval = TimeSpan.FromSeconds(15); options.ClientTimeoutInterval = TimeSpan.FromSeconds(30); }); builder.Services.AddSingleton<IConnectionManager, ConnectionManager>(); builder.Services.AddScoped<IMCPKernelService, MCPKernelService>(); // CORS para desenvolvimento builder.Services.AddCors(options => { options.AddPolicy("AllowAll", policy => { policy.AllowAnyOrigin() .AllowAnyMethod() .AllowAnyHeader(); }); }); var app = builder.Build(); // Configurar pipeline app.UseCors("AllowAll"); app.MapHub<MCPHub>("/mcphub"); app.MapGet("/", () => Results.Ok(new { Service = "MCP WebSocket Server", Status = "Running", Version = "1.0.0", HubEndpoint = "/mcphub" })); app.MapGet("/health", (IConnectionManager connectionManager) => Results.Ok(new { Status = "Healthy", ActiveConnections = connectionManager.GetConnectionCount(), Timestamp = DateTime.UtcNow })); app.Run(); 7️⃣ Cliente JavaScript/TypeScript
import * as signalR from "@microsoft/signalr"; class MCPWebSocketClient { private connection: signalR.HubConnection; constructor(hubUrl: string) { this.connection = new signalR.HubConnectionBuilder() .withUrl(hubUrl) .withAutomaticReconnect({ nextRetryDelayInMilliseconds: (context) => { return Math.min(1000 * Math.pow(2, context.previousRetryCount), 30000); } }) .configureLogging(signalR.LogLevel.Information) .build(); this.setupEventHandlers(); } private setupEventHandlers(): void { this.connection.on("Connected", (data) => { console.log("✅ Conectado:", data); }); this.connection.on("CommandResponse", (response) => { console.log("📥 Resposta:", response); }); this.connection.on("StreamChunk", (chunk) => { console.log(`📡 Chunk ${chunk.chunkIndex}:`, chunk.content); }); this.connection.on("StreamComplete", (data) => { console.log("✅ Streaming concluído:", data); }); this.connection.on("SystemEvent", (event) => { console.log("🔔 Evento:", event.type, event.message); }); this.connection.onreconnecting((error) => { console.warn("⚠️ Reconectando...", error); }); this.connection.onreconnected((connectionId) => { console.log("✅ Reconectado:", connectionId); }); this.connection.onclose((error) => { console.error("❌ Conexão fechada:", error); }); } async connect(): Promise<void> { try { await this.connection.start(); console.log("✅ Conexão SignalR estabelecida"); } catch (error) { console.error("❌ Erro ao conectar:", error); throw error; } } async sendCommand(command: string, payload: string): Promise<void> { const message = { command, payload, metadata: {}, timestamp: new Date().toISOString(), sessionId: this.connection.connectionId || "" }; await this.connection.invoke("SendCommand", message); } async streamCommand(command: string, payload: string): Promise<void> { const message = { command, payload, metadata: {}, timestamp: new Date().toISOString(), sessionId: this.connection.connectionId || "" }; await this.connection.invoke("StreamCommand", message); } async getStatus(): Promise<any> { return await this.connection.invoke("GetServerStatus"); } async disconnect(): Promise<void> { await this.connection.stop(); } } // Uso const client = new MCPWebSocketClient("https://localhost:5001/mcphub"); await client.connect(); await client.sendCommand("analyze", "Este é um texto de teste para análise"); await client.streamCommand("generate", "Gere um artigo sobre WebSockets"); const status = await client.getStatus(); console.log("Status do servidor:", status); 8️⃣ Cliente C# (.NET)
using Microsoft.AspNetCore.SignalR.Client; using MCPPipeline.WebSocket.Models; public class MCPWebSocketClient : IAsyncDisposable { private readonly HubConnection _connection; private readonly ILogger<MCPWebSocketClient> _logger; public MCPWebSocketClient(string hubUrl, ILogger<MCPWebSocketClient> logger) { _logger = logger; _connection = new HubConnectionBuilder() .WithUrl(hubUrl) .WithAutomaticReconnect(new[] { TimeSpan.FromSeconds(0), TimeSpan.FromSeconds(2), TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(30) }) .Build(); SetupEventHandlers(); } private void SetupEventHandlers() { _connection.On<object>("Connected", data => { _logger.LogInformation("✅ Conectado: {Data}", data); }); _connection.On<MCPResponse>("CommandResponse", response => { _logger.LogInformation("📥 Resposta: {Result} ({Status})", response.Result, response.Status); }); _connection.On<MCPStreamChunk>("StreamChunk", chunk => { _logger.LogInformation("📡 Chunk {Index}: {Content}", chunk.ChunkIndex, chunk.Content); }); _connection.On<MCPEvent>("SystemEvent", evt => { _logger.LogInformation("🔔 Evento: {Type} - {Message}", evt.Type, evt.Message); }); _connection.Reconnecting += error => { _logger.LogWarning("⚠️ Reconectando... {Error}", error?.Message); return Task.CompletedTask; }; _connection.Reconnected += connectionId => { _logger.LogInformation("✅ Reconectado: {ConnectionId}", connectionId); return Task.CompletedTask; }; _connection.Closed += error => { _logger.LogError("❌ Conexão fechada: {Error}", error?.Message); return Task.CompletedTask; }; } public async Task ConnectAsync(CancellationToken ct = default) { await _connection.StartAsync(ct); _logger.LogInformation("✅ Conexão SignalR estabelecida"); } public async Task SendCommandAsync(string command, string payload, CancellationToken ct = default) { var message = new MCPMessage { Command = command, Payload = payload, Timestamp = DateTime.UtcNow, SessionId = _connection.ConnectionId ?? string.Empty }; await _connection.InvokeAsync("SendCommand", message, ct); } public async Task StreamCommandAsync(string command, string payload, CancellationToken ct = default) { var message = new MCPMessage { Command = command, Payload = payload, Timestamp = DateTime.UtcNow, SessionId = _connection.ConnectionId ?? string.Empty }; await _connection.InvokeAsync("StreamCommand", message, ct); } public async Task<object> GetStatusAsync(CancellationToken ct = default) { return await _connection.InvokeAsync<object>("GetServerStatus", ct); } public async ValueTask DisposeAsync() { await _connection.DisposeAsync(); } } // Uso await using var client = new MCPWebSocketClient( "https://localhost:5001/mcphub", loggerFactory.CreateLogger<MCPWebSocketClient>()); await client.ConnectAsync(); await client.SendCommandAsync("analyze", "Texto para análise"); await client.StreamCommandAsync("generate", "Prompt de geração"); var status = await client.GetStatusAsync(); Console.WriteLine($"Status: {status}"); 🔒 Segurança e Autenticação
Implementar JWT Authentication
// Program.cs builder.Services.AddAuthentication(JwtBearerDefaults.AuthenticationScheme) .AddJwtBearer(options => { options.TokenValidationParameters = new TokenValidationParameters { ValidateIssuer = true, ValidateAudience = true, ValidateLifetime = true, ValidateIssuerSigningKey = true, ValidIssuer = builder.Configuration["Jwt:Issuer"], ValidAudience = builder.Configuration["Jwt:Audience"], IssuerSigningKey = new SymmetricSecurityKey( Encoding.UTF8.GetBytes(builder.Configuration["Jwt:Key"]!)) }; // Configurar autenticação para WebSocket options.Events = new JwtBearerEvents { OnMessageReceived = context => { var accessToken = context.Request.Query["access_token"]; var path = context.HttpContext.Request.Path; if (!string.IsNullOrEmpty(accessToken) && path.StartsWithSegments("/mcphub")) { context.Token = accessToken; } return Task.CompletedTask; } }; }); builder.Services.AddAuthorization(); // Aplicar middleware app.UseAuthentication(); app.UseAuthorization(); Autorizar Hub
[Authorize] public class MCPHub : Hub { // Todos os métodos exigem autenticação // Autorização baseada em roles [Authorize(Roles = "Admin,MCPAgent")] public async Task AdminCommand(MCPMessage message) { // Apenas admins e agentes MCP podem executar } } Validação de Rate Limiting
public class RateLimitFilter : IHubFilter { private readonly ILogger<RateLimitFilter> _logger; private readonly ConcurrentDictionary<string, RateLimitInfo> _rateLimits = new(); public RateLimitFilter(ILogger<RateLimitFilter> logger) { _logger = logger; } public async ValueTask<object?> InvokeMethodAsync( HubInvocationContext invocationContext, Func<HubInvocationContext, ValueTask<object?>> next) { var connectionId = invocationContext.Context.ConnectionId; if (!CheckRateLimit(connectionId)) { _logger.LogWarning("Rate limit excedido: {ConnectionId}", connectionId); throw new HubException("Rate limit excedido. Tente novamente em alguns segundos."); } return await next(invocationContext); } private bool CheckRateLimit(string connectionId) { var info = _rateLimits.GetOrAdd(connectionId, _ => new RateLimitInfo()); var now = DateTime.UtcNow; var window = now.AddSeconds(-60); info.Requests.RemoveAll(r => r < window); info.Requests.Add(now); return info.Requests.Count <= 60; // Max 60 requisições por minuto } private class RateLimitInfo { public List<DateTime> Requests { get; } = new(); } } // Registrar no Program.cs builder.Services.AddSignalR(options => { options.AddFilter<RateLimitFilter>(); }); 📊 Monitoramento e Observabilidade
Implementar Health Checks
builder.Services.AddHealthChecks() .AddCheck<SignalRHealthCheck>("signalr") .AddCheck("connections", () => { var connectionManager = app.Services.GetRequiredService<IConnectionManager>(); var count = connectionManager.GetConnectionCount(); return count < 10000 ? HealthCheckResult.Healthy($"Conexões ativas: {count}") : HealthCheckResult.Degraded($"Muitas conexões: {count}"); }); app.MapHealthChecks("/health", new HealthCheckOptions { ResponseWriter = UIResponseWriter.WriteHealthCheckUIResponse }); public class SignalRHealthCheck : IHealthCheck { private readonly IConnectionManager _connectionManager; public SignalRHealthCheck(IConnectionManager connectionManager) { _connectionManager = connectionManager; } public Task<HealthCheckResult> CheckHealthAsync( HealthCheckContext context, CancellationToken ct = default) { var connections = _connectionManager.GetConnectionCount(); var data = new Dictionary<string, object> { ["active_connections"] = connections, ["timestamp"] = DateTime.UtcNow }; return Task.FromResult( HealthCheckResult.Healthy("SignalR Hub operacional", data)); } } Adicionar OpenTelemetry
builder.Services.AddOpenTelemetry() .WithTracing(tracing => { tracing .AddAspNetCoreInstrumentation() .AddSource("MCPPipeline.WebSocket") .AddOtlpExporter(); }) .WithMetrics(metrics => { metrics .AddAspNetCoreInstrumentation() .AddMeter("MCPPipeline.WebSocket") .AddPrometheusExporter(); }); // Usar no Hub public class MCPHub : Hub { private static readonly ActivitySource ActivitySource = new("MCPPipeline.WebSocket"); public async Task SendCommand(MCPMessage message) { using var activity = ActivitySource.StartActivity("ProcessCommand"); activity?.SetTag("command", message.Command); activity?.SetTag("connection_id", Context.ConnectionId); // Processar comando... } } Métricas Customizadas
public class MCPMetricsService { private static readonly Counter<long> CommandsProcessed = Meter.CreateCounter<long>("mcp.commands.processed"); private static readonly Histogram<double> CommandDuration = Meter.CreateHistogram<double>("mcp.command.duration", "ms"); private static readonly ObservableGauge<int> ActiveConnections; private readonly IConnectionManager _connectionManager; private static readonly Meter Meter = new("MCPPipeline.WebSocket"); public MCPMetricsService(IConnectionManager connectionManager) { _connectionManager = connectionManager; ActiveConnections = Meter.CreateObservableGauge( "mcp.connections.active", () => _connectionManager.GetConnectionCount()); } public void RecordCommandProcessed(string command, string status, double durationMs) { CommandsProcessed.Add(1, new KeyValuePair<string, object?>("command", command), new KeyValuePair<string, object?>("status", status)); CommandDuration.Record(durationMs, new KeyValuePair<string, object?>("command", command)); } } 🚀 Escalabilidade com Redis Backplane
Para escalar horizontalmente com múltiplas instâncias:
dotnet add package Microsoft.AspNetCore.SignalR.StackExchangeRedis builder.Services.AddSignalR() .AddStackExchangeRedis(options => { options.Configuration.EndPoints.Add("localhost:6379"); options.Configuration.ChannelPrefix = "MCPHub"; }); Arquitetura Escalável
┌─────────────┐ ┌──────────────┐ ┌─────────────┐ │ Cliente 1 │────────►│ Server 1 │────────►│ │ └─────────────┘ │ (SignalR) │ │ │ └──────────────┘ │ │ │ │ Redis │ ┌─────────────┐ ▼ │ Backplane │ │ Cliente 2 │────────►┌──────────────┐ │ │ └─────────────┘ │ Server 2 │───────►│ │ │ (SignalR) │ │ │ ┌─────────────┐ └──────────────┘ └─────────────┘ │ Cliente 3 │────────►┌──────────────┐ │ └─────────────┘ │ Server 3 │───────────────┘ │ (SignalR) │ └──────────────┘ 🧪 Testes Automatizados
Teste de Integração
public class MCPHubIntegrationTests : IClassFixture<WebApplicationFactory<Program>> { private readonly WebApplicationFactory<Program> _factory; public MCPHubIntegrationTests(WebApplicationFactory<Program> factory) { _factory = factory; } [Fact] public async Task Should_Connect_And_Receive_Welcome_Message() { // Arrange var hubConnection = new HubConnectionBuilder() .WithUrl("http://localhost/mcphub", options => { options.HttpMessageHandlerFactory = _ => _factory.Server.CreateHandler(); }) .Build(); var welcomeReceived = false; hubConnection.On<object>("Connected", data => { welcomeReceived = true; }); // Act await hubConnection.StartAsync(); await Task.Delay(500); // Aguardar mensagem // Assert Assert.True(welcomeReceived); await hubConnection.StopAsync(); } [Fact] public async Task Should_Process_Command_And_Return_Response() { // Arrange var hubConnection = new HubConnectionBuilder() .WithUrl("http://localhost/mcphub", options => { options.HttpMessageHandlerFactory = _ => _factory.Server.CreateHandler(); }) .Build(); MCPResponse? response = null; hubConnection.On<MCPResponse>("CommandResponse", r => response = r); await hubConnection.StartAsync(); // Act await hubConnection.InvokeAsync("SendCommand", new MCPMessage { Command = "status", Payload = "test" }); await Task.Delay(1000); // Assert Assert.NotNull(response); Assert.Equal("SUCCESS", response.Status); await hubConnection.StopAsync(); } } Teste de Carga
public class LoadTests { [Fact] public async Task Should_Handle_Multiple_Concurrent_Connections() { const int clientCount = 100; var clients = new List<HubConnection>(); var tasks = new List<Task>(); try { // Criar múltiplos clientes for (int i = 0; i < clientCount; i++) { var connection = new HubConnectionBuilder() .WithUrl("https://localhost:5001/mcphub") .Build(); clients.Add(connection); tasks.Add(connection.StartAsync()); } await Task.WhenAll(tasks); tasks.Clear(); // Enviar comandos concorrentemente foreach (var client in clients) { tasks.Add(client.InvokeAsync("SendCommand", new MCPMessage { Command = "status", Payload = "load test" })); } await Task.WhenAll(tasks); // Assert Assert.Equal(clientCount, clients.Count(c => c.State == HubConnectionState.Connected)); } finally { foreach (var client in clients) { await client.StopAsync(); await client.DisposeAsync(); } } } } 🎯 Boas Práticas
✅ DOs
- Use reconexão automática para lidar com quedas de rede
- Implemente heartbeat para detectar conexões inativas
- Valide mensagens no servidor antes de processar
- Use grupos para broadcast eficiente a subconjuntos de clientes
- Implemente rate limiting para prevenir abuso
- Monitore métricas de conexões ativas e latência
- Use backplane Redis em ambientes multi-instância
- Implemente circuit breaker para proteger serviços downstream
❌ DON'Ts
- Não armazene estado no Hub (use serviços externos)
- Não envie dados sensíveis sem criptografia
- Não bloqueie o thread com operações síncronas
- Não ignore exceções em handlers de eventos
- Não use WebSocket quando HTTP/REST é suficiente
- Não deixe conexões órfãs sem timeout
🔄 Comparação: WebSocket vs gRPC vs REST
| Critério | WebSocket | gRPC | REST |
|---|---|---|---|
| Latência | Muito Baixa | Muito Baixa | Média |
| Overhead | Baixo | Muito Baixo | Alto |
| Complexidade | Média | Alta | Baixa |
| Suporte Browser | ✅ Nativo | ⚠️ Limitado | ✅ Total |
| Streaming | ✅ Bidirecional | ✅ Bidirecional | ❌ Não |
| Tipagem | Flexível | Forte | Flexível |
| Estado | Stateful | Stateless/Stateful | Stateless |
| Use Case Principal | Real-time UI | Microsserviços | APIs públicas |
📈 Métricas de Performance
Benchmarks Esperados
- Latência média: < 50ms para comandos simples
- Throughput: > 10.000 mensagens/segundo por instância
- Conexões simultâneas: > 100.000 com Redis Backplane
- Tempo de reconexão: < 5 segundos
- Uso de memória: ~1KB por conexão ativa
Otimizações
// 1. Usar MessagePack para serialização binária builder.Services.AddSignalR() .AddMessagePackProtocol(); // 2. Configurar compressão builder.Services.AddResponseCompression(options => { options.MimeTypes = ResponseCompressionDefaults.MimeTypes.Concat( new[] { "application/octet-stream" }); }); // 3. Configurar Kestrel para alta performance builder.WebHost.ConfigureKestrel(options => { options.Limits.MaxConcurrentConnections = 100000; options.Limits.MaxConcurrentUpgradedConnections = 100000; options.Limits.MaxRequestBodySize = 10 * 1024 * 1024; // 10MB }); 🧩 Integração com Semantic Kernel
public class MCPWebSocketPlugin { private readonly MCPWebSocketClient _client; public MCPWebSocketPlugin(MCPWebSocketClient client) { _client = client; } [KernelFunction] [Description("Analisa texto usando o serviço MCP via WebSocket")] public async Task<string> AnalyzeTextAsync( [Description("Texto para análise")] string text) { await _client.SendCommandAsync("analyze", text); // Aguardar resposta (implementar padrão de callback ou TaskCompletionSource) return "Análise enviada para processamento"; } [KernelFunction] [Description("Gera conteúdo com streaming em tempo real")] public async IAsyncEnumerable<string> GenerateStreamingAsync( [Description("Prompt de geração")] string prompt) { var chunks = new List<string>(); // Configurar handler temporário para capturar chunks // (implementação depende da arquitetura do cliente) await _client.StreamCommandAsync("generate", prompt); foreach (var chunk in chunks) { yield return chunk; } } } 🎯 Casos de Uso Práticos
1. Dashboard de Monitoramento em Tempo Real
public class MonitoringHub : Hub { private readonly IMetricsCollector _metrics; public async Task SubscribeToMetrics(string systemId) { await Groups.AddToGroupAsync(Context.ConnectionId, $"metrics-{systemId}"); // Enviar histórico imediato var history = await _metrics.GetRecentHistoryAsync(systemId); await Clients.Caller.SendAsync("MetricsHistory", history); } // Background service envia atualizações periódicas public async Task BroadcastMetricsUpdate(string systemId, object metrics) { await Clients.Group($"metrics-{systemId}") .SendAsync("MetricsUpdate", metrics); } } 2. Chat com IA Multi-Usuário
public class AIChatHub : Hub { private readonly ILLMService _llm; public async Task SendMessage(string roomId, string message) { // Broadcast mensagem do usuário await Clients.Group(roomId).SendAsync("UserMessage", new { User = Context.User?.Identity?.Name, Message = message, Timestamp = DateTime.UtcNow }); // Processar com IA e fazer streaming da resposta await foreach (var chunk in _llm.GenerateResponseAsync(message)) { await Clients.Group(roomId).SendAsync("AIResponseChunk", chunk); } } public async Task JoinRoom(string roomId) { await Groups.AddToGroupAsync(Context.ConnectionId, roomId); await Clients.Group(roomId).SendAsync("UserJoined", Context.User?.Identity?.Name); } } 3. Sincronização de Estado Colaborativo
public class CollaborationHub : Hub { private readonly IStateManager _state; public async Task UpdateDocument(string documentId, DocumentChange change) { // Aplicar mudança await _state.ApplyChangeAsync(documentId, change); // Broadcast para outros usuários await Clients.OthersInGroup(documentId) .SendAsync("DocumentChanged", change); } public async Task<Document> JoinDocument(string documentId) { await Groups.AddToGroupAsync(Context.ConnectionId, documentId); return await _state.GetDocumentAsync(documentId); } } 🎓 Conclusão
A integração do MCP com WebSocket no .NET cria uma infraestrutura poderosa para aplicações que exigem comunicação bidirecional em tempo real. Combinando a simplicidade do SignalR com a inteligência contextual do Model Context Protocol, é possível construir experiências interativas e responsivas que escalam horizontalmente.
Principais vantagens:
- ✅ Comunicação full-duplex com latência mínima
- ✅ Suporte nativo a navegadores e clientes diversos
- ✅ Reconexão automática e resiliência built-in
- ✅ Escalabilidade horizontal com Redis Backplane
- ✅ Integração natural com frameworks modernos
Quando escolher WebSocket:
- Dashboards e visualizações em tempo real
- Aplicações colaborativas multi-usuário
- Chat e assistentes conversacionais
- Notificações e alertas instantâneos
- Streaming de resultados progressivos de IA
Próximos passos:
- Implementar autenticação OAuth2/OpenID Connect
- Adicionar persistência de mensagens com Event Sourcing
- Configurar CDN e edge locations para latência global
- Implementar quotas e throttling por tenant
Na Parte 4 desta série, exploraremos "Observabilidade e Tracing Distribuído" entre MCP e .NET usando OpenTelemetry, Jaeger e Grafana.
🤝 Conecte-se Comigo
Se você trabalha com .NET moderno e quer dominar arquitetura, C#, observabilidade, DevOps ou interoperabilidade:
💼 LinkedIn
✍️ Medium
📬 contato@dopme.io
📬 devsfree@devsfree.com.br
📚 Referências:
- SignalR Documentation
- WebSocket Protocol (RFC 6455)
- SignalR Redis Backplane
- Real-time Web with ASP.NET Core
⁴⁹ E, vendo os que estavam com ele o que ia suceder, disseram-lhe: Senhor, feriremos à espada? Lucas 22:49
Top comments (0)