Este é o primeiro artigo de uma série onde mostrarei como conectar e configurar sistemas usando o Brighter. Neste artigo, focaremos na integração do Brighter com o Apache Kafka.
Introdução Rápida ao Kafka
O Kafka é uma plataforma distribuída de processamento de streaming projetada para alta taxa de transferência e processamento de mensagens em tempo real. Conceitos-chave incluem:
- Fluxos: Processamento de mensagens individualmente, com concorrência limitada pelo número de partições do tópico.
- Componentes Principais:
- Tópico (Topic): Uma categoria/canal para o qual mensagens são publicadas.
- Partição (Partition): Uma divisão de um tópico que permite processamento paralelo.
- Grupo de Consumidores (Consumer Group): Conjunto de consumidores que processam mensagens de um tópico de forma colaborativa.
Para integrar o Kafka com o Brighter, você precisará:
Nome do Tópico: O tópico Kafka alvo (ex: greeting.topic
).
Número de Partições: Define os limites de concorrência.
Consumer Group ID: Garante a distribuição de mensagens entre os consumidores.
Requisitos
- .NET 8 ou superior.
- Um projeto .NET com esses pacotes NuGet:
- Paramore.Brighter.MessagingGateway.Kafka: Permite integração com Kafka.
- Paramore.Brighter.ServiceActivator.Extensions.DependencyInjection: Simplifica a injeção de dependência.
- Paramore.Brighter.ServiceActivator.Extensions.Hosting: Hospeda o Brighter como serviço em segundo plano.
- Serilog.AspNetCore: Para registro estruturado.
- Docker/podman: Para configuração local do Kafka.
Configuração Local do Kafka com Docker/Podman
Use esse docker-compose.yml
para inicializar o Kafka, Zookeeper e uma interface:
services: zookeeper: image: confluentinc/cp-zookeeper ports: - "2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 kafka: image: confluentinc/cp-kafka depends_on: - zookeeper healthcheck: test: kafka-topics --bootstrap-server kafka:29092 --list || exit 1 interval: 10s timeout: 10s retries: 5 ports: - "9092:9092" - "29092:29092" - "9997:9997" expose: - "29092" - "9092" environment: KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1" KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT" KAFKA_MIN_INSYNC_REPLICAS: "1" kafka-ui: image: provectuslabs/kafka-ui container_name: kafka-ui depends_on: - kafka ports: - "8088:8080" environment: KAFKA_CLUSTERS_0_NAME: kafka KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092 KAFKA_CLUSTERS_0_METRICS_PORT: 9997 DYNAMIC_CONFIG_ENABLED: "true"
Passos:
- Execute
podman-compose -f docker-compose.yml up -d
(oudocker-compose
). - Acesse a interface do Kafka em
http://localhost:8088
.
Nota: Este exemplo usa PLAINTEXT
para simplicidade. Use SSL
/SASL_SSL
em produção.
Recapitulação do Brighter
Antes de prosseguir com a configuração do Kafka, vamos recapitular o que já sabemos sobre o Brighter.
Request (Comando/Evento)
O Brighter usa IRequest
para marcar objetos para processamento. Herde de Command
ou Event
:
public class Greeting : Event(Guid.NewGuid()) { public string Name { get; set; } = string.Empty; }
Mapeador de Mensagens
Traduz entre request do Brighter e mensagens do Kafka:
public class GreetingMapper : IAmAMessageMapper<Greeting> { public Message MapToMessage(Greeting request) { var header = new MessageHeader(); header.Id = request.Id; header.TimeStamp = DateTime.UtcNow; header.Topic = "greeting.topic"; // Tópico de destino para publicação header.MessageType = MessageType.MT_EVENT; var body = new MessageBody(JsonSerializer.Serialize(request)); return new Message(header, body); } public Greeting MapToRequest(Message message) { return JsonSerializer.Deserialize<Greeting>(message.Body.Bytes)!; } }
Request Handler
Processa mensagens recebidas:
public class GreetingHandler(ILogger<GreetingHandler> logger) : RequestHandler<Greeting> { public override Greeting Handle(Greeting command) { logger.LogInformation("Hello {Name}", command.Name); return base.Handle(command); } }
Configurando o Kafka com o Brighter
Conexão com o Kafka
Defina as configurações de conexão:
var connection = new KafkaMessagingGatewayConfiguration { Name = "sample", // Nome da aplicação BootStrapServers = ["localhost:9092"], // Endereço do broker SecurityProtocol = SecurityProtocol.Plaintext, // Use SSL em produção SaslMechanisms = SaslMechanism.Plain, SaslUsername = "admin", // Para autenticação SASL SaslPassword = "admin-secret" };
Consumidor Kafka
Configure Subscription
e Channel
:
.AddServiceActivator(opt => { opt.Subscriptions = [ new KafkaSubscription<Greeting>( new SubscriptionName("kafka.greeting.subscription"), // Nome da assinatura (interno) new ChannelName("greeting.topic"), // Nome do tópico new RoutingKey("greeting.topic"), // Nome do tópico groupId: "some-consumer-group", // ID do grupo de consumidores makeChannels: OnMissingChannel.Create, // Ação ao verificar se tópico existe numOfPartitions: 2, // Número de partições do tópico (útil ao criar via código) noOfPerformers: 2, // Número de subscription em paralelo (não deve ultrapassar o número de partições) isAsync: false, // true quando quiser RequestHandlerAsync ), ]; opt.ChannelFactory = new ChannelFactory(new KafkaMessageConsumerFactory(connection)); })
Produtor Kafka
Configure o external bus para publicação:
.UseExternalBus(new KafkaProducerRegistryFactory(connection, [ new KafkaPublication { MakeChannels = OnMissingChannel.Create, NumPartitions = 2, // Número de partições do tópico (útil ao criar via código) Topic = new RoutingKey("greeting.topic"), // Nome do tópico (deve ser o mesmo usado no mapeamento) }, ]).Create() );
Conclusão
A integração do Brighter com o Kafka simplifica a construção de sistemas escaláveis e orientados a mensagens.
Referências
- Código completo: Repositório GitHub
Top comments (0)