DEV Community

Cover image for Construindo um Servidor TCP com Proto.Actor: Explorando o Modelo de Atores no .NET
Rafael Andrade
Rafael Andrade

Posted on

Construindo um Servidor TCP com Proto.Actor: Explorando o Modelo de Atores no .NET

Introdução

Em artigos anterior, demonstrei o uso básico do Proto.Actor, um framework para o Modelo de Atores em .NET. Neste artigo, vamos construir um exemplo mais complexo: um servidor TCP usando três atores para gerenciar conexões, recepção de bytes e processamento de dados.

Visão Geral do Projeto

Atores Principais

  1. WaitForTcpConnectionActor:
    • Escuta por novas conexões TCP.
    • Cria uma instância de ReceiveBytesActor para cada conexão.
  2. ReceiveBytesActor:
    • Recebe bytes do socket.
    • Cria uma instância de ProcessActor para desserializar, logar os dados e reiniciá-lo até 3 vezes em caso de falha.
  3. ProcessActor:
    • Desserializa os bytes recebidos em um objeto Sample e o imprime no console.

Requisitos

Iniciando o Sistema de Atores

Configuração do sistema de atores para criar o WaitForTcpConnectionActor e encerrá-lo ao pressionar Ctrl+C:

using Proto; using TcpServer; var system = new ActorSystem(); var cancellationTokenSource = new CancellationTokenSource(); Console.CancelKeyPress += (_, _) => { cancellationTokenSource.Cancel(); }; system.Root.Spawn(Props.FromProducer(() => new WaitForTcpConnectionActor(9091))); while (!cancellationTokenSource.IsCancellationRequested) { await Task.Delay(1_000); } await system.ShutdownAsync(); 
Enter fullscreen mode Exit fullscreen mode

Ator de Espera por Conexão TCP

O Modelo de Atores funciona com os atores se comunicando por meio de mensagens. Vamos usar mensagens fornecido como Started e Terminated, além de nossas proprias messagens como a WaitForNextConnection.

Conforme mostrado no artigo anterior, um ator pode ser definido da seguinte forma:

public class WaitForTcpConnectionActor(int port) : IActor { public async Task ReceiveAsync(IContext context) { } } 
Enter fullscreen mode Exit fullscreen mode

Iniciando o Listener TCP

O primeiro passo é iniciar o servidor TCP usando a mensagem Started:

public class WaitForTcpConnectionActor(int port) : IActor { private TcpListener? _listener; public async Task ReceiveAsync(IContext context) { if (context.Message is Started) { Console.WriteLine("Escutando na porta 9091"); _listener = TcpListener.Create(port); _listener.Start(); } } } 
Enter fullscreen mode Exit fullscreen mode

Aguardando Conexões

O Modelo de Atores funciona com os atores se comunicando por meio de mensagens. Vamos usar mensagens integradas como Started e Terminated, além de uma mensagem personalizada chamada WaitForNextConnection.

Conforme mostrado no artigo anterior, um ator pode ser definido da seguinte forma:

public class WaitForTcpConnectionActor(int port) : IActor { public async Task ReceiveAsync(IContext context) { } } 
Enter fullscreen mode Exit fullscreen mode

Iniciando o TCP Listener

O primeiro passo é iniciar nosso servidor TCP. Para isso, usamos a mensagem Started:

public class WaitForTcpConnectionActor(int port) : IActor { private TcpListener? _listener; public async Task ReceiveAsync(IContext context) { if(context.Message is Started) { Console.WriteLine("Listening on port 9091"); _listener = TcpListener.Create(port); _listener.Start(); } } } 
Enter fullscreen mode Exit fullscreen mode

Em seguida, aguarde uma conexão, enviando uma mensagem ao próprio ator:

public class WaitForTcpConnectionActor(int port) : IActor { private TcpListener? _listener; public async Task ReceiveAsync(IContext context) { if(context.Message is Started) { _listener = TcpListener.Create(port); _listener.Start(); context.Send(context.Self, new WaitForNextConnection()); } } } 
Enter fullscreen mode Exit fullscreen mode

Aguardando uma Conexão TCP

Agora que estamos escutando conexões, precisamos aceitá-las e criar novos atores para processar cada uma:

public class WaitForTcpConnectionActor(int port) : IActor { ... public async Task ReceiveAsync(IContext context) { if(context.Message is Started) { ... } else if(context.Message is WaitForNextConnection) { var socket = await _listener!.AcceptSocketAsync(cancellationToken); var actor = context.Spawn(Props.FromProducer(() => new ReceiveBytesActor())) .WithChildSupervisorStrategy(new OneForOneStrategy( (_, exception) => { Console.WriteLine("Error: {0}", exception); return SupervisorDirective.Restart; }, 3, TimeSpan.FromSeconds(1)));; context.Send(actor, new SocketAccepted(socket)); context.Send(context.Self, new WaitForNextConnection()); } } } 
Enter fullscreen mode Exit fullscreen mode

Supervisão de Atores

Configuramos uma estratégia OneForOneStrategy para supervisionar instâncias de ReceiveBytesActor:

Se um ator filho falhar, ele será reiniciado até 3 vezes em 1 segundo.

Isso garante que erros transitórios (ex: mensagens malformadas) não derrubem todo o sistema.

Notificando Conclusão

Quando o processamento é concluído, o ator pai recebe a mensagem ProcessCompleted. Isso sinaliza ao pai para parar explicitamente o ator filho, garantindo liberação adequada de recursos e evitando vazamentos de memória.

public class WaitForTcpConnectionActor(int port) : IActor { ... public async Task ReceiveAsync(IContext context) { if(context.Message is Started) { ... } else if(context.Message is { Message: Terminated, Sender: not null })) { _listener?.Dispose(); } else if(context.Message is ProcessCompleted) { await context.StopAsync(Sender); } else if(context.Message is WaitForNextConnection) { ... } } } 
Enter fullscreen mode Exit fullscreen mode

Liberação de Recursos

Quando uma conexão é processada:

  • A mensagem ProcessCompleted indica conclusão.
  • O ator pai para o filho e aciona a liberação de recursos.

Desligamento Elegante

Ao desligar o sistema de atores, devemos liberar corretamente o TcpListener para evitar vazamentos de recursos:

public class WaitForTcpConnectionActor(int port) : IActor { ... public async Task ReceiveAsync(IContext context) { if(context.Message is Started) { ... } else if(context.Message is { Message: Terminated, Sender: not null })) { _listener?.Dispose(); } else if(context.Message is ProcessCompleted) { .... } else if(context.Message is WaitForNextConnection) { ... } } } 
Enter fullscreen mode Exit fullscreen mode

Recebendo Bytes

O próximo passo é receber bytes de um socket.

Tratando SocketAccepted

Quando uma nova conexão é aceita, o ator armazena o socket e lê os bytes disponíveis:

public class ReceiveBytesActor : IActor { private Socket? _socket; private byte[]? _buffer; public async Task ReceiveAsync(IContext context) { if(context.Message is SocketAccepted socket) { _socket = socket; _buffer = new byte[_socket.Available]; await _socket.ReceiveAsync(_buffer); var props = Props.FromProducer(() => new ProcessActor()); var actor = context.SpawnNamed(props, "json-serializer"); context.Send(actor, new SocketReceived(_buffer!)); } } } 
Enter fullscreen mode Exit fullscreen mode

Notificando Conclusão

Após o processamento, o ator para o ProcessActor filho e notifica seu pai para liberar recursos:

public class ReceiveBytesActor : IActor { public async Task ReceiveAsync(IContext context) { if(context.Message is SocketAccepted socket) { // Lógica de recepção de bytes } else if(context.Message is ProcessCompleted) { await context.StopAsync(Sender); context.Send(context.Parent!, new ProcessCompleted()); } } } 
Enter fullscreen mode Exit fullscreen mode

Encerramento Elegante do Socket

Quando o ator é encerrado, ele descarta o socket e para todos os atores filhos para evitar vazamentos de memória:

public class ReceiveBytesActor : IActor { public async Task ReceiveAsync(IContext context) { if(context.Message is Terminated) { _buffer = null; _socket?.Dispose(); await context.Children.StopMany(context); } else if(context.Message is SocketAccepted socket) { // Lógica de recepção de bytes } else if(context.Message is ProcessCompleted) { // Notificação de conclusão } } } 
Enter fullscreen mode Exit fullscreen mode

Reenviando o Buffer Recebido

Se o ProcessActor falhar e reiniciar, o ReceiveBytesActor reenvia o buffer armazenado para reprocessamento:

public class ReceiveBytesActor : IActor { public async Task ReceiveAsync(IContext context) { if(context.Message is Terminated) { // Liberação de recursos } else if(context.Message is SocketAccepted socket) { // Lógica de recepção de bytes } else if(context.Message is ProcessCompleted) { // Notificação de conclusão } else if(context.Message is ResendBufferReceived) { context.Send(Sender, new ResendBufferReceived(_buffer!)); } } } 
Enter fullscreen mode Exit fullscreen mode

Processamento de Dados

O último ator desserializa e registra os dados recebidos.

Mensagem BufferReceived

A mensagem BufferReceived contém os bytes brutos do socket. Este ator desserializa os dados em um objeto Sample e imprime no console. Após o processamento, ele notifica o ator pai (ReceiveBytesActor) por meio da mensagem ProcessCompleted para liberar recursos:

public class ProcessActor : IActor { public Task ReceiveAsync(IContext context) { if (context.Message is BufferReceived socketReceived) { var json = JsonSerializer.Deserialize<Sample>(socketReceived.Data)!; Console.WriteLine("Recebida com ID: {0} e nome: {1}", json.Id, json.Name); context.Send(context.Parent!, new ProcessCompleted(context.Self)); } return Task.CompletedTask; } } 
Enter fullscreen mode Exit fullscreen mode

Reinicialização (Restarting)

Quando um ator é reiniciado, o Proto.Actor envia a mensagem Restarting para o próprio ator. Isso permite que o ator notifique seu ator pai para retransmitir a mensagem original (ou estado), garantindo que o ator reiniciado possa reprocessá-la:

public class ProcessActor : IActor { public Task ReceiveAsync(IContext context) { if (context.Message is Restarting) { context.Send(context.Parent!, new ResendBufferReceived()); } else if (context.Message is BufferReceived socketReceived) { ... } return Task.CompletedTask; } } 
Enter fullscreen mode Exit fullscreen mode

Cliente TCP

Implementação de um cliente TCP simples que envia entrada do usuário (convertida em JSON):

using System.Net.Sockets; using System.Text.Json; using TcpServer.Client; var id = 0; while (true) { Console.Write("Digite um nome (q para sair/f para não serializar): "); var name = Console.ReadLine(); if (string.IsNullOrWhiteSpace(name)) { continue; } if (name == "q") { break; } try { var connection = new TcpClient(); await connection.ConnectAsync("localhost", 9091); var stream = connection.GetStream(); if (name == "f") { await stream.WriteAsync(new[] { (byte)'f' }); } else { await JsonSerializer.SerializeAsync(stream, new Sample { Id = id++, Name = name }); } connection.Close(); } catch (Exception e) { Console.WriteLine("Erro: {0}", e.Message); } } 
Enter fullscreen mode Exit fullscreen mode

Conclusão

O Proto.Actor é uma ferramenta poderosa para construir sistemas tolerantes a falhas, demonstrando como o Modelo de Atores simplifica concorrência, gerenciamento de recursos e recuperação de erros. Neste exemplo, exploramos:

  • Estratégias de Supervisão: Uso de OneForOneStrategy para reiniciar atores falhos até 3 vezes.
  • Gerenciamento do Ciclo de Vida dos Atores: Tratamento de mensagens como Started, Terminated e Restarting.
  • Resiliência de Mensagens: Retentativas de operações falhas via ResendSocketAccepted e ProcessCompleted.

Considerações para Produção

  1. Leitura de Bytes:
    • O método _socket.Available é útil para demonstrações, mas pode ser inconsistente em produção. Use MemoryStream para lidar com dados de tamanho variável.
  2. Tratamento de Erros:
    • Em sistemas reais, encapsule operações de socket em blocos try-catch e implemente logs detalhados.
  3. Liberação de Recursos:
    • Use timeouts ou verificações de heartbeat para evitar conexões órfãs.

Código Completo

Repositório GitHub

Top comments (0)