Confluent.Kafka version: '3.8' services: kafka: image: confluentinc/cp-kafka:6.0.14 depends_on: - zookeeper ports: - '29092:29092' environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka:9092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 kafka-ui: image: provectuslabs/kafka-ui:latest ports: - 8085:8080 environment: KAFKA_CLUSTERS_0_NAME: local KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092 DYNAMIC_CONFIG_ENABLED: 'true' zookeeper: image: confluentinc/cp-zookeeper:6.0.14 ports: - '22181:2181' environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 public class MessageProducerService { public async Task SendMessageAsync(string topic, CustomerDto customer) { var config = new ProducerConfig { BootstrapServers = "localhost:29092", ClientId = "OrderClient", Acks = Acks.All }; var message = new Message<string, string> { Key = customer.Id, Value = JsonSerializer.Serialize(customer) }; using var producer = new ProducerBuilder<string, string>(config).Build(); await producer.ProduceAsync(topic, message); } }[Route("api/[controller]")] [ApiController] public class CustomersController(CustomerDbContext context, MessageProducerService producerService) : ControllerBase { [HttpGet] public async Task<IActionResult>GetCustomers() { return Ok(await context.Customers.ToListAsync()); } [HttpGet("{customerId}")] public async Task<IActionResult>CreateCustomerOrder(string customerId) { var customer = await context.Customers.SingleOrDefaultAsync(_ => _.Id == customerId); if (customer is null) return NotFound(); await producerService.SendMessageAsync(MessageTopic.CREATE_ORDER, new CustomerDto(customer.Id, customer.Name, customer.Surname, customer.Balance)); return Ok(); } }public class Worker : BackgroundService { private readonly ILogger<Worker> _logger; private readonly OrderDbContext _context; public Worker(ILogger<Worker> logger, OrderDbContext context) { _logger = logger; _context = context; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { var config = new ConsumerConfig { BootstrapServers = "localhost:29092", AutoOffsetReset = AutoOffsetReset.Earliest, ClientId = "OrderClient", GroupId = "OrderGroup" }; using var consumer = new ConsumerBuilder<string, string>(config).Build(); consumer.Subscribe(MessageTopic.CREATE_ORDER); _logger.LogInformation("Connected kafka"); while (!stoppingToken.IsCancellationRequested) { var data = consumer.Consume(); if (data is not null) { var customer = JsonSerializer.Deserialize<CustomerDto>(data.Message.Value); if (customer!.Balance > 0) { _context.Orders.Add(new Order(customer.Id, DateTime.Now)); await _context.SaveChangesAsync(stoppingToken); _logger.LogInformation($"{customer.Id} Customer's order has been created successfully."); } else { _logger.LogInformation($"{customer.Id} Customer's balance is less than 0!"); } } _logger.LogInformation($"There is no message"); } } }
