Introduction
In today's data-driven world, maintaining synchronized data across different systems is crucial. Change Data Capture (CDC) has emerged as a powerful pattern for tracking and propagating changes from your database in real-time. In this guide, we'll build a practical example using Debezium and Apache Kafka to create a robust CDC pipeline.
What We'll Build
We'll create a simple e-commerce scenario where order updates in a PostgreSQL database are automatically synchronized with an Elasticsearch instance for real-time search capabilities. This setup demonstrates a common real-world use case for CDC.
Prerequisites
- Docker and Docker Compose
- Java 11 or higher
- Maven
- Git
- PostgreSQL client (psql)
- curl (for testing)
Architecture Overview
Our architecture consists of several components:
- PostgreSQL database (source)
- Debezium connector
- Apache Kafka
- Kafka Connect
- Elasticsearch (target)
- Simple Spring Boot application for testing
graph LR A[PostgreSQL] -->|Debezium| B[Kafka Connect] B -->|Events| C[Kafka] C -->|Sink Connector| D[Elasticsearch] E[Spring Boot App] -->|Writes| A D -->|Search| E
Implementation Steps
1. Setting Up the Environment
First, let's create our project structure:
mkdir cdc-demo cd cdc-demo git init
Create a docker-compose.yml
file:
version: '3' services: postgres: image: debezium/postgres:13 ports: - "5432:5432" environment: - POSTGRES_DB=inventory - POSTGRES_USER=postgres - POSTGRES_PASSWORD=postgres volumes: - ./postgres/init:/docker-entrypoint-initdb.d kafka: image: confluentinc/cp-kafka:7.3.0 ports: - "9092:9092" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 depends_on: - zookeeper zookeeper: image: confluentinc/cp-zookeeper:7.3.0 ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 connect: image: debezium/connect:2.1 ports: - "8083:8083" environment: GROUP_ID: 1 CONFIG_STORAGE_TOPIC: my_connect_configs OFFSET_STORAGE_TOPIC: my_connect_offsets STATUS_STORAGE_TOPIC: my_connect_statuses BOOTSTRAP_SERVERS: kafka:29092 depends_on: - kafka - postgres elasticsearch: image: docker.elastic.co/elasticsearch/elasticsearch:7.17.0 ports: - "9200:9200" environment: - discovery.type=single-node - ES_JAVA_OPTS=-Xms512m -Xmx512m
2. Creating the Database Schema
Create postgres/init/init.sql
:
CREATE TABLE orders ( id SERIAL PRIMARY KEY, customer_id INTEGER NOT NULL, order_date TIMESTAMP NOT NULL, status VARCHAR(50) NOT NULL, total_amount DECIMAL(10,2) NOT NULL ); ALTER TABLE orders REPLICA IDENTITY FULL;
3. Configuring Debezium
After starting the containers, configure the Debezium connector:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "postgres", "database.port": "5432", "database.user": "postgres", "database.password": "postgres", "database.dbname": "inventory", "database.server.name": "dbserver1", "table.include.list": "public.orders", "plugin.name": "pgoutput" } }'
4. Spring Boot Application
Create a new Spring Boot project with the following dependencies:
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.postgresql</groupId> <artifactId>postgresql</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> </dependencies>
Create the Order entity:
@Entity @Table(name = "orders") public class Order { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; private Long customerId; private LocalDateTime orderDate; private String status; private BigDecimal totalAmount; // Getters, setters, and constructors }
Create a REST controller for testing:
@RestController @RequestMapping("/api/orders") public class OrderController { private final OrderRepository orderRepository; public OrderController(OrderRepository orderRepository) { this.orderRepository = orderRepository; } @PostMapping public Order createOrder(@RequestBody Order order) { order.setOrderDate(LocalDateTime.now()); return orderRepository.save(order); } @PutMapping("/{id}") public Order updateOrder(@PathVariable Long id, @RequestBody Order order) { return orderRepository.findById(id) .map(existingOrder -> { existingOrder.setStatus(order.getStatus()); existingOrder.setTotalAmount(order.getTotalAmount()); return orderRepository.save(existingOrder); }) .orElseThrow(() -> new ResponseStatusException(HttpStatus.NOT_FOUND)); } }
5. Testing the Pipeline
- Start all containers:
docker-compose up -d
- Create a test order:
curl -X POST http://localhost:8080/api/orders \ -H "Content-Type: application/json" \ -d '{ "customerId": 1, "status": "NEW", "totalAmount": 99.99 }'
- Update the order:
curl -X PUT http://localhost:8080/api/orders/1 \ -H "Content-Type: application/json" \ -d '{ "status": "PROCESSING", "totalAmount": 99.99 }'
- Check Kafka topics to verify the CDC events:
docker-compose exec kafka kafka-console-consumer \ --bootstrap-server kafka:29092 \ --topic dbserver1.public.orders \ --from-beginning
Common Challenges and Solutions
-
Data Consistency
- Use transaction logs for accurate change capture
- Implement idempotent consumers
- Handle out-of-order events
-
Performance Optimization
- Batch updates when possible
- Monitor Kafka partition lag
- Tune PostgreSQL replication slots
-
Error Handling
- Implement dead letter queues
- Set up proper monitoring and alerting
- Create retry mechanisms
Best Practices
-
Schema Evolution
- Use Avro for schema management
- Plan for backward/forward compatibility
- Test schema changes thoroughly
-
Monitoring
- Track replication lag
- Monitor Kafka consumer group offsets
- Set up alerts for connector failures
-
Security
- Use SSL/TLS for communication
- Implement proper authentication
- Follow least privilege principle
Conclusion
CDC with Debezium and Kafka provides a robust solution for real-time data synchronization. This setup can be extended to handle more complex scenarios like:
- Multi-region deployment
- Multiple target systems
- Complex transformation pipelines
- High availability requirements
Top comments (0)