In this blog, we’ll dive into Kafka, a distributed streaming platform, and learn how to create a Producer and Consumer in .NET 6 using ASP.NET Core. We’ll cover Kafka’s core concepts, provide detailed explanations for each code snippet, and build a functional application that sends and receives messages.
What is Kafka?
Kafka is a high-throughput, distributed messaging system designed to handle real-time data streams. It has three key components:
- Producer: Sends data (messages) to Kafka topics.
- Consumer: Reads data (messages) from Kafka topics.
- Broker: A Kafka server that stores and manages incoming messages. Kafka typically runs in a cluster with multiple brokers.
Kafka organizes data into topics, which are like categories for storing messages. Messages within topics are immutable and ordered.
Core Kafka Concepts
- Topic: Logical channel to which messages are sent.
- Partition: Each topic is divided into partitions for parallel processing. Partitions ensure scalability.
- Offset: Unique identifier for messages within a partition.
- Broker: Kafka server managing topics and partitions.
- Producer: Sends data to Kafka topics.
- Consumer: Reads data from Kafka topics and processes it.
- Group: Consumers are organized into groups to share load and ensure each message is processed by one consumer within the group.
Prerequisites
- .NET 6 SDK: Download and install from the official .NET website.
- Kafka Installed: Follow the Kafka installation guide or use Docker to set up Kafka.
Getting Started with Kafka in .NET 6
Step 1: Set up Kafka
1.Start Zookeeper (Kafka’s dependency):
zookeeper-server-start.bat ..\..\config\zookeeper.properties
2.Start Kafka:
kafka-server-start.bat ..\..\config\server.properties
3.Create a Kafka topic for this example:
kafka-topics.bat --create --topic fruit --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1
Step 2: Create a .NET 6 Project
Run the following command to create an ASP.NET Core Web API project:
dotnet new webapi -n KafkaProducerConsumer cd KafkaProducerConsumer
Step 3: Install Kafka Library
Kafka communication in .NET is enabled by the Confluent.Kafka library. Install it via NuGet:
dotnet add package Confluent.Kafka dotnet add package Swashbuckle.AspNetCore
Kafka in .NET 6: Step-by-Step Implementation
We will build two services:
- Kafka Producer Service: Sends messages to Kafka topics.
- Kafka Consumer Service: Continuously listens to and processes messages from Kafka topics.
Step 4: Configure Kafka Settings
Add Kafka configurations in appsettings.json to simplify access throughout the application:
{ "Kafka": { "BootstrapServers": "localhost:9092" }, }
Explanation:
- BootstrapServers: Address of the Kafka broker.
- Topic: The topic where messages will be sent or received.
- GroupId: Identifies the consumer group for message processing.
Step 5: Create the Kafka Producer Service
Create a folder named Services
and add KafkaProducerService.cs
.
using Confluent.Kafka; namespace KafkaExample.Services; public interface IKafkaProducerService { Task SendMessageAsync(string topic, string message); } public class KafkaProducerService : IKafkaProducerService { private readonly IProducer<Null, string> _producer; // Constructor to initialize Kafka producer with configuration public KafkaProducerService() { var config = new ProducerConfig { BootstrapServers = "localhost:9092" // Kafka server details (ensure this is correct) }; _producer = new ProducerBuilder<Null, string>(config).Build(); } // Method to send message to Kafka topic public async Task SendMessageAsync(string topic, string message) { try { // Send message to the specified Kafka topic await _producer.ProduceAsync(topic, new Message<Null, string> { Value = message }); Console.WriteLine($"Message '{message}' sent to topic '{topic}'."); } catch (Exception ex) { // Log any errors encountered while sending message Console.WriteLine($"Error sending message to Kafka: {ex.Message}"); throw; } } }
Explanation:
- ProducerConfig: Configures the producer, specifying the Kafka broker.
- ProduceAsync: Sends a message to Kafka asynchronously.
- Null: Key is set to null since our example doesn’t use keyed messages.
- _producer: The Kafka producer instance sends messages to the specified topic.
Step 6: Create the Kafka Consumer Service
Add KafkaConsumerService.cs to the Services folder.
using Confluent.Kafka; using System; using System.Threading.Tasks; namespace KafkaExample.Services { public class KafkaConsumerService { private readonly IConsumer<Null, string> _consumer; public KafkaConsumerService() { var config = new ConsumerConfig { BootstrapServers = "localhost:9092", GroupId = "my-consumer-group", AutoOffsetReset = AutoOffsetReset.Earliest }; _consumer = new ConsumerBuilder<Null, string>(config).Build(); } public void ConsumeMessages(string topic) { _consumer.Subscribe(topic); try { while (true) { var consumeResult = _consumer.Consume(); Console.WriteLine($"Consumed message: {consumeResult.Message.Value}"); } } catch (ConsumeException e) { Console.WriteLine($"Error consuming message: {e.Error.Reason}"); } } } }
Explanation:
- ConsumerConfig: Configures the consumer to connect to the broker and specify the consumer group.
- Subscribe: Subscribes the consumer to a topic.
- Consume: Reads messages from the topic.
- AutoOffsetReset.Earliest: Ensures the consumer starts reading messages from the beginning of the topic if no offsets exist.
Step 7: Register Services in Program.cs
Add both Kafka services to the application in Program.cs.
using KafkaExample.Services; using Microsoft.AspNetCore.Builder; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; var builder = WebApplication.CreateBuilder(args); // Register Kafka producer service with Dependency Injection builder.Services.AddSingleton<IKafkaProducerService, KafkaProducerService>(); // Add controllers (required for API endpoints) builder.Services.AddControllers(); // Add Swagger for API documentation builder.Services.AddEndpointsApiExplorer(); // For Swagger UI builder.Services.AddSwaggerGen(); // For Swagger UI var app = builder.Build(); // Configure Swagger in development environment if (app.Environment.IsDevelopment()) { app.UseSwagger(); app.UseSwaggerUI(); } // Use HTTPS Redirection app.UseHttpsRedirection(); // Configure the HTTP request pipeline to use controllers app.MapControllers(); app.Run();
Step 8: Create an API Endpoint for Sending Messages.
Add a KafkaController.cs in the Controllers folder to handle message requests.
using KafkaExample.Services; using Microsoft.AspNetCore.Mvc; namespace KafkaProducerConsumer.Controllers { [ApiController] [Route("api/[controller]")] public class KafkaController : ControllerBase { private readonly IKafkaProducerService _producerService; public KafkaController(IKafkaProducerService producerService) { _producerService = producerService; } [HttpPost("send")] public async Task<IActionResult> SendMessage([FromQuery] string topic, [FromQuery] string message) { if (string.IsNullOrEmpty(topic) || string.IsNullOrEmpty(message)) { return BadRequest("Both 'topic' and 'message' query parameters are required."); } await _producerService.SendMessageAsync(topic, message); return Ok($"Message '{message}' sent successfully to topic '{topic}'."); } } }
Explanation:
This API accepts a message as a query parameter and passes it to the Kafka producer.
This will be your solution explorer should look like:
Step 9: Run and Test
1.Start the Kafka and Zookeeper servers.
2.Run the .NET application:
dotnet run
3.Use a REST client like Postman to send a message
POST http://localhost:5292/api/kafka/send?topic=fruit&message=apple
Kafka Workflow Recap
- The producer sends the message to the fruits topic.
- The Kafka broker receives and stores the message.
- The consumer reads the message from the topic and processes it.
This demonstrates the basic producer-consumer pattern in Kafka, integrated with a .NET 6 application.
You will get source code from github.
Conclusion
In this blog, we explored Kafka concepts and implemented a real-time producer-consumer application in .NET 6. Kafka is highly scalable, fault-tolerant, and suitable for distributed systems. This example can be extended to include advanced features like message keying, batch processing, error handling, and monitoring.
Happy coding!
Top comments (0)