DEV Community

César Fabián CHÁVEZ LINARES
César Fabián CHÁVEZ LINARES

Posted on

Building Real-Time Data Pipelines with Debezium and Kafka: A Practical Guide

Introduction

In today's data-driven world, maintaining synchronized data across different systems is crucial. Change Data Capture (CDC) has emerged as a powerful pattern for tracking and propagating changes from your database in real-time. In this guide, we'll build a practical example using Debezium and Apache Kafka to create a robust CDC pipeline.

What We'll Build

We'll create a simple e-commerce scenario where order updates in a PostgreSQL database are automatically synchronized with an Elasticsearch instance for real-time search capabilities. This setup demonstrates a common real-world use case for CDC.

Prerequisites

  • Docker and Docker Compose
  • Java 11 or higher
  • Maven
  • Git
  • PostgreSQL client (psql)
  • curl (for testing)

Architecture Overview

Our architecture consists of several components:

  1. PostgreSQL database (source)
  2. Debezium connector
  3. Apache Kafka
  4. Kafka Connect
  5. Elasticsearch (target)
  6. Simple Spring Boot application for testing
graph LR A[PostgreSQL] -->|Debezium| B[Kafka Connect] B -->|Events| C[Kafka] C -->|Sink Connector| D[Elasticsearch] E[Spring Boot App] -->|Writes| A D -->|Search| E 
Enter fullscreen mode Exit fullscreen mode

Implementation Steps

1. Setting Up the Environment

First, let's create our project structure:

mkdir cdc-demo cd cdc-demo git init 
Enter fullscreen mode Exit fullscreen mode

Create a docker-compose.yml file:

version: '3' services: postgres: image: debezium/postgres:13 ports: - "5432:5432" environment: - POSTGRES_DB=inventory - POSTGRES_USER=postgres - POSTGRES_PASSWORD=postgres volumes: - ./postgres/init:/docker-entrypoint-initdb.d kafka: image: confluentinc/cp-kafka:7.3.0 ports: - "9092:9092" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 depends_on: - zookeeper zookeeper: image: confluentinc/cp-zookeeper:7.3.0 ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 connect: image: debezium/connect:2.1 ports: - "8083:8083" environment: GROUP_ID: 1 CONFIG_STORAGE_TOPIC: my_connect_configs OFFSET_STORAGE_TOPIC: my_connect_offsets STATUS_STORAGE_TOPIC: my_connect_statuses BOOTSTRAP_SERVERS: kafka:29092 depends_on: - kafka - postgres elasticsearch: image: docker.elastic.co/elasticsearch/elasticsearch:7.17.0 ports: - "9200:9200" environment: - discovery.type=single-node - ES_JAVA_OPTS=-Xms512m -Xmx512m 
Enter fullscreen mode Exit fullscreen mode

2. Creating the Database Schema

Create postgres/init/init.sql:

CREATE TABLE orders ( id SERIAL PRIMARY KEY, customer_id INTEGER NOT NULL, order_date TIMESTAMP NOT NULL, status VARCHAR(50) NOT NULL, total_amount DECIMAL(10,2) NOT NULL ); ALTER TABLE orders REPLICA IDENTITY FULL; 
Enter fullscreen mode Exit fullscreen mode

3. Configuring Debezium

After starting the containers, configure the Debezium connector:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "postgres", "database.port": "5432", "database.user": "postgres", "database.password": "postgres", "database.dbname": "inventory", "database.server.name": "dbserver1", "table.include.list": "public.orders", "plugin.name": "pgoutput" } }' 
Enter fullscreen mode Exit fullscreen mode

4. Spring Boot Application

Create a new Spring Boot project with the following dependencies:

<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.postgresql</groupId> <artifactId>postgresql</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> </dependencies> 
Enter fullscreen mode Exit fullscreen mode

Create the Order entity:

@Entity @Table(name = "orders") public class Order { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; private Long customerId; private LocalDateTime orderDate; private String status; private BigDecimal totalAmount; // Getters, setters, and constructors } 
Enter fullscreen mode Exit fullscreen mode

Create a REST controller for testing:

@RestController @RequestMapping("/api/orders") public class OrderController { private final OrderRepository orderRepository; public OrderController(OrderRepository orderRepository) { this.orderRepository = orderRepository; } @PostMapping public Order createOrder(@RequestBody Order order) { order.setOrderDate(LocalDateTime.now()); return orderRepository.save(order); } @PutMapping("/{id}") public Order updateOrder(@PathVariable Long id, @RequestBody Order order) { return orderRepository.findById(id) .map(existingOrder -> { existingOrder.setStatus(order.getStatus()); existingOrder.setTotalAmount(order.getTotalAmount()); return orderRepository.save(existingOrder); }) .orElseThrow(() -> new ResponseStatusException(HttpStatus.NOT_FOUND)); } } 
Enter fullscreen mode Exit fullscreen mode

5. Testing the Pipeline

  1. Start all containers:
docker-compose up -d 
Enter fullscreen mode Exit fullscreen mode
  1. Create a test order:
curl -X POST http://localhost:8080/api/orders \ -H "Content-Type: application/json" \ -d '{ "customerId": 1, "status": "NEW", "totalAmount": 99.99 }' 
Enter fullscreen mode Exit fullscreen mode
  1. Update the order:
curl -X PUT http://localhost:8080/api/orders/1 \ -H "Content-Type: application/json" \ -d '{ "status": "PROCESSING", "totalAmount": 99.99 }' 
Enter fullscreen mode Exit fullscreen mode
  1. Check Kafka topics to verify the CDC events:
docker-compose exec kafka kafka-console-consumer \ --bootstrap-server kafka:29092 \ --topic dbserver1.public.orders \ --from-beginning 
Enter fullscreen mode Exit fullscreen mode

Common Challenges and Solutions

  1. Data Consistency

    • Use transaction logs for accurate change capture
    • Implement idempotent consumers
    • Handle out-of-order events
  2. Performance Optimization

    • Batch updates when possible
    • Monitor Kafka partition lag
    • Tune PostgreSQL replication slots
  3. Error Handling

    • Implement dead letter queues
    • Set up proper monitoring and alerting
    • Create retry mechanisms

Best Practices

  1. Schema Evolution

    • Use Avro for schema management
    • Plan for backward/forward compatibility
    • Test schema changes thoroughly
  2. Monitoring

    • Track replication lag
    • Monitor Kafka consumer group offsets
    • Set up alerts for connector failures
  3. Security

    • Use SSL/TLS for communication
    • Implement proper authentication
    • Follow least privilege principle

Conclusion

CDC with Debezium and Kafka provides a robust solution for real-time data synchronization. This setup can be extended to handle more complex scenarios like:

  • Multi-region deployment
  • Multiple target systems
  • Complex transformation pipelines
  • High availability requirements

Resources

Top comments (0)