Introdução
Em artigos anterior, demonstrei o uso básico do Proto.Actor, um framework para o Modelo de Atores em .NET. Neste artigo, vamos construir um exemplo mais complexo: um servidor TCP usando três atores para gerenciar conexões, recepção de bytes e processamento de dados.
Visão Geral do Projeto
Atores Principais
- WaitForTcpConnectionActor:
- Escuta por novas conexões TCP.
- Cria uma instância de
ReceiveBytesActor
para cada conexão.
- ReceiveBytesActor:
- Recebe bytes do socket.
- Cria uma instância de
ProcessActor
para desserializar, logar os dados e reiniciá-lo até 3 vezes em caso de falha.
- ProcessActor:
- Desserializa os bytes recebidos em um objeto
Sample
e o imprime no console.
- Desserializa os bytes recebidos em um objeto
Requisitos
- .NET 8+
- Pacotes NuGet:
Iniciando o Sistema de Atores
Configuração do sistema de atores para criar o WaitForTcpConnectionActor
e encerrá-lo ao pressionar Ctrl+C
:
using Proto; using TcpServer; var system = new ActorSystem(); var cancellationTokenSource = new CancellationTokenSource(); Console.CancelKeyPress += (_, _) => { cancellationTokenSource.Cancel(); }; system.Root.Spawn(Props.FromProducer(() => new WaitForTcpConnectionActor(9091))); while (!cancellationTokenSource.IsCancellationRequested) { await Task.Delay(1_000); } await system.ShutdownAsync();
Ator de Espera por Conexão TCP
O Modelo de Atores funciona com os atores se comunicando por meio de mensagens. Vamos usar mensagens fornecido como Started e Terminated, além de nossas proprias messagens como a WaitForNextConnection.
Conforme mostrado no artigo anterior, um ator pode ser definido da seguinte forma:
public class WaitForTcpConnectionActor(int port) : IActor { public async Task ReceiveAsync(IContext context) { } }
Iniciando o Listener TCP
O primeiro passo é iniciar o servidor TCP usando a mensagem Started
:
public class WaitForTcpConnectionActor(int port) : IActor { private TcpListener? _listener; public async Task ReceiveAsync(IContext context) { if (context.Message is Started) { Console.WriteLine("Escutando na porta 9091"); _listener = TcpListener.Create(port); _listener.Start(); } } }
Aguardando Conexões
O Modelo de Atores funciona com os atores se comunicando por meio de mensagens. Vamos usar mensagens integradas como Started
e Terminated, além de uma mensagem personalizada chamada WaitForNextConnection
.
Conforme mostrado no artigo anterior, um ator pode ser definido da seguinte forma:
public class WaitForTcpConnectionActor(int port) : IActor { public async Task ReceiveAsync(IContext context) { } }
Iniciando o TCP Listener
O primeiro passo é iniciar nosso servidor TCP. Para isso, usamos a mensagem Started
:
public class WaitForTcpConnectionActor(int port) : IActor { private TcpListener? _listener; public async Task ReceiveAsync(IContext context) { if(context.Message is Started) { Console.WriteLine("Listening on port 9091"); _listener = TcpListener.Create(port); _listener.Start(); } } }
Em seguida, aguarde uma conexão, enviando uma mensagem ao próprio ator:
public class WaitForTcpConnectionActor(int port) : IActor { private TcpListener? _listener; public async Task ReceiveAsync(IContext context) { if(context.Message is Started) { _listener = TcpListener.Create(port); _listener.Start(); context.Send(context.Self, new WaitForNextConnection()); } } }
Aguardando uma Conexão TCP
Agora que estamos escutando conexões, precisamos aceitá-las e criar novos atores para processar cada uma:
public class WaitForTcpConnectionActor(int port) : IActor { ... public async Task ReceiveAsync(IContext context) { if(context.Message is Started) { ... } else if(context.Message is WaitForNextConnection) { var socket = await _listener!.AcceptSocketAsync(cancellationToken); var actor = context.Spawn(Props.FromProducer(() => new ReceiveBytesActor())) .WithChildSupervisorStrategy(new OneForOneStrategy( (_, exception) => { Console.WriteLine("Error: {0}", exception); return SupervisorDirective.Restart; }, 3, TimeSpan.FromSeconds(1)));; context.Send(actor, new SocketAccepted(socket)); context.Send(context.Self, new WaitForNextConnection()); } } }
Supervisão de Atores
Configuramos uma estratégia OneForOneStrategy para supervisionar instâncias de ReceiveBytesActor
:
Se um ator filho falhar, ele será reiniciado até 3 vezes em 1 segundo.
Isso garante que erros transitórios (ex: mensagens malformadas) não derrubem todo o sistema.
Notificando Conclusão
Quando o processamento é concluído, o ator pai recebe a mensagem ProcessCompleted
. Isso sinaliza ao pai para parar explicitamente o ator filho, garantindo liberação adequada de recursos e evitando vazamentos de memória.
public class WaitForTcpConnectionActor(int port) : IActor { ... public async Task ReceiveAsync(IContext context) { if(context.Message is Started) { ... } else if(context.Message is { Message: Terminated, Sender: not null })) { _listener?.Dispose(); } else if(context.Message is ProcessCompleted) { await context.StopAsync(Sender); } else if(context.Message is WaitForNextConnection) { ... } } }
Liberação de Recursos
Quando uma conexão é processada:
- A mensagem ProcessCompleted indica conclusão.
- O ator pai para o filho e aciona a liberação de recursos.
Desligamento Elegante
Ao desligar o sistema de atores, devemos liberar corretamente o TcpListener
para evitar vazamentos de recursos:
public class WaitForTcpConnectionActor(int port) : IActor { ... public async Task ReceiveAsync(IContext context) { if(context.Message is Started) { ... } else if(context.Message is { Message: Terminated, Sender: not null })) { _listener?.Dispose(); } else if(context.Message is ProcessCompleted) { .... } else if(context.Message is WaitForNextConnection) { ... } } }
Recebendo Bytes
O próximo passo é receber bytes de um socket.
Tratando SocketAccepted
Quando uma nova conexão é aceita, o ator armazena o socket e lê os bytes disponíveis:
public class ReceiveBytesActor : IActor { private Socket? _socket; private byte[]? _buffer; public async Task ReceiveAsync(IContext context) { if(context.Message is SocketAccepted socket) { _socket = socket; _buffer = new byte[_socket.Available]; await _socket.ReceiveAsync(_buffer); var props = Props.FromProducer(() => new ProcessActor()); var actor = context.SpawnNamed(props, "json-serializer"); context.Send(actor, new SocketReceived(_buffer!)); } } }
Notificando Conclusão
Após o processamento, o ator para o ProcessActor
filho e notifica seu pai para liberar recursos:
public class ReceiveBytesActor : IActor { public async Task ReceiveAsync(IContext context) { if(context.Message is SocketAccepted socket) { // Lógica de recepção de bytes } else if(context.Message is ProcessCompleted) { await context.StopAsync(Sender); context.Send(context.Parent!, new ProcessCompleted()); } } }
Encerramento Elegante do Socket
Quando o ator é encerrado, ele descarta o socket e para todos os atores filhos para evitar vazamentos de memória:
public class ReceiveBytesActor : IActor { public async Task ReceiveAsync(IContext context) { if(context.Message is Terminated) { _buffer = null; _socket?.Dispose(); await context.Children.StopMany(context); } else if(context.Message is SocketAccepted socket) { // Lógica de recepção de bytes } else if(context.Message is ProcessCompleted) { // Notificação de conclusão } } }
Reenviando o Buffer Recebido
Se o ProcessActor
falhar e reiniciar, o ReceiveBytesActor
reenvia o buffer armazenado para reprocessamento:
public class ReceiveBytesActor : IActor { public async Task ReceiveAsync(IContext context) { if(context.Message is Terminated) { // Liberação de recursos } else if(context.Message is SocketAccepted socket) { // Lógica de recepção de bytes } else if(context.Message is ProcessCompleted) { // Notificação de conclusão } else if(context.Message is ResendBufferReceived) { context.Send(Sender, new ResendBufferReceived(_buffer!)); } } }
Processamento de Dados
O último ator desserializa e registra os dados recebidos.
Mensagem BufferReceived
A mensagem BufferReceived
contém os bytes brutos do socket. Este ator desserializa os dados em um objeto Sample
e imprime no console. Após o processamento, ele notifica o ator pai (ReceiveBytesActor
) por meio da mensagem ProcessCompleted
para liberar recursos:
public class ProcessActor : IActor { public Task ReceiveAsync(IContext context) { if (context.Message is BufferReceived socketReceived) { var json = JsonSerializer.Deserialize<Sample>(socketReceived.Data)!; Console.WriteLine("Recebida com ID: {0} e nome: {1}", json.Id, json.Name); context.Send(context.Parent!, new ProcessCompleted(context.Self)); } return Task.CompletedTask; } }
Reinicialização (Restarting
)
Quando um ator é reiniciado, o Proto.Actor
envia a mensagem Restarting
para o próprio ator. Isso permite que o ator notifique seu ator pai para retransmitir a mensagem original (ou estado), garantindo que o ator reiniciado possa reprocessá-la:
public class ProcessActor : IActor { public Task ReceiveAsync(IContext context) { if (context.Message is Restarting) { context.Send(context.Parent!, new ResendBufferReceived()); } else if (context.Message is BufferReceived socketReceived) { ... } return Task.CompletedTask; } }
Cliente TCP
Implementação de um cliente TCP simples que envia entrada do usuário (convertida em JSON):
using System.Net.Sockets; using System.Text.Json; using TcpServer.Client; var id = 0; while (true) { Console.Write("Digite um nome (q para sair/f para não serializar): "); var name = Console.ReadLine(); if (string.IsNullOrWhiteSpace(name)) { continue; } if (name == "q") { break; } try { var connection = new TcpClient(); await connection.ConnectAsync("localhost", 9091); var stream = connection.GetStream(); if (name == "f") { await stream.WriteAsync(new[] { (byte)'f' }); } else { await JsonSerializer.SerializeAsync(stream, new Sample { Id = id++, Name = name }); } connection.Close(); } catch (Exception e) { Console.WriteLine("Erro: {0}", e.Message); } }
Conclusão
O Proto.Actor é uma ferramenta poderosa para construir sistemas tolerantes a falhas, demonstrando como o Modelo de Atores simplifica concorrência, gerenciamento de recursos e recuperação de erros. Neste exemplo, exploramos:
- Estratégias de Supervisão: Uso de
OneForOneStrategy
para reiniciar atores falhos até 3 vezes. - Gerenciamento do Ciclo de Vida dos Atores: Tratamento de mensagens como
Started
,Terminated
eRestarting
. - Resiliência de Mensagens: Retentativas de operações falhas via
ResendSocketAccepted
eProcessCompleted
.
Considerações para Produção
- Leitura de Bytes:
- O método
_socket.Available
é útil para demonstrações, mas pode ser inconsistente em produção. UseMemoryStream
para lidar com dados de tamanho variável.
- O método
- Tratamento de Erros:
- Em sistemas reais, encapsule operações de socket em blocos
try-catch
e implemente logs detalhados.
- Em sistemas reais, encapsule operações de socket em blocos
- Liberação de Recursos:
- Use timeouts ou verificações de heartbeat para evitar conexões órfãs.
Top comments (0)