DEV Community

Cover image for CDC in AWS: Content Data Capture from AWS RDS MySQL into AWS MSK Kafka topic using Debezium
Roman Tsypuk for AWS Community Builders

Posted on • Originally published at tsypuk.github.io

CDC in AWS: Content Data Capture from AWS RDS MySQL into AWS MSK Kafka topic using Debezium

Kafka connect is a powerful open-source platform for Change Data Capture (CDC), enabling real-time event streaming from databases like MySQL.
In this post, we'll explore how to set up one of the most popular Open Source Debezium to capture changes from an AWS MySQL RDS instance and publish them to a Kafka topic.
We'll also dive into the Debezium message format and explain the difference between standalone and cluster modes.

What problem does it solve

Debezium allows you to:

  • Stream real-time changes from a MySQL RDS instance.
  • Track inserts, updates, and deletes as they occur.
  • Publish these changes to Kafka topics for downstream processing.

infra.png

This is particularly useful for building event-driven architectures, data pipelines, and synchronizing databases with other systems.

Besides Debezium there are multiple opensource connectors available on confluent platform to provide intergration point with different sink and source systems like AWS S3, ElasticSearch
**, **MongoDB
, etc

Setting Up Debezium for MySQL RDS

1. Enable Binary Logging on MySQL RDS:

Log in to your RDS instance and ensure binary logging is enabled in your parameter group.
Configure the following parameters:

  • binlog_format = RAW
  • binlog_row_image = FULL

  • Ensure binlog_retention_period is set to a sufficient duration for your use case.

SHOW VARIABLES LIKE 'binlog_format'; 
Enter fullscreen mode Exit fullscreen mode
Variable_name Value
binlog_format RAW

SHOW VARIABLES LIKE 'log_bin%';
SHOW VARIABLES LIKE 'binlog_format';
SHOW VARIABLES LIKE 'binlog_row_image';

SET SESSION binlog_format = 'ROW'; SET GLOBAL binlog_format = 'ROW'; 
Enter fullscreen mode Exit fullscreen mode

3. Run Debezium with Kafka Connect:

Use Docker Compose to start Debezium and Kafka Connect:

services: debezium: image: debezium/connect:2.7.3.Final ports: - "8083:8083" healthcheck: test: [ "CMD-SHELL", "curl -f http://localhost:8083/ || exit 1" ] networks: - kafka-cluster environment: - BOOTSTRAP_SERVERS=b-4.kafka-dev-0.xxxx.c6.kafka.us-east-1.amazonaws.com:9092,b-3.kafka-dev-0.xxxx.c6.kafka.us-east-1.amazonaws.com:9092,b-1.kafka-dev-0.xxxx.c6.kafka.us-east-1.amazonaws.com:9092 - GROUP_ID=MYSQL_1 - CONFIG_STORAGE_TOPIC=debezium_connect_configs - OFFSET_STORAGE_TOPIC=debezium_connect_offsets - STATUS_STORAGE_TOPIC=debezium_source_connect_statuses - CONFIG_STORAGE_REPLICATION_FACTOR=1 - OFFSET_STORAGE_REPLICATION_FACTOR=1 - STATUS_STORAGE_REPLICATION_FACTOR=1 networks: kafka-cluster: driver: bridge 
Enter fullscreen mode Exit fullscreen mode

CONFIG_STORAGE_TOPIC, OFFSET_STORAGE_TOPIC, STATUS_STORAGE_TOPIC are topics in Kafka where connector will store its own configuration and perform synchronization.
CONFIG_STORAGE_REPLICATION_FACTOR, OFFSET_STORAGE_REPLICATION_FACTOR, STATUS_STORAGE_REPLICATION_FACTOR should always be 1 in replication factor.

4. Detect server_id that will be used in connector configuration

SHOW VARIABLES LIKE 'server_id'; 
Enter fullscreen mode Exit fullscreen mode

5. Configure the MySQL Connector

Kafka connect exposes HTTP port to establish and monitor connectors.

Create a MySQL source connector by sending a POST request to the Kafka Connect REST API:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d @../payload.json 
Enter fullscreen mode Exit fullscreen mode
{ "name": "mysql-moderation-comments-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "xxxx.us-east-1.rds.amazonaws.com", "database.port": "3306", "database.user": "user", "database.password": "password", "database.server.id": "this_is_mysql_server_id", "topic.prefix": "mysql-cdc", "database.include.list": "public", "table.include.list": "public.users", "schema.history.internal.kafka.bootstrap.servers": "b-4.kafka-dev-0.xxxx.c6.kafka.us-east-1.amazonaws.com:9092,b-3.kafka-dev-0.xxxx.c6.kafka.us-east-1.amazonaws.com:9092,b-1.kafka-dev-0.xxxx.c6.kafka.us-east-1.amazonaws.com:9092", "schema.history.internal.kafka.topic": "schema-changes.moderation", "include.schema.changes": true, "key.converter.schemas.enable": false, "key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": false, "value.converter": "org.apache.kafka.connect.json.JsonConverter", "database.history.skip.unparseable.ddl": true } } 
Enter fullscreen mode Exit fullscreen mode
Parameter Details
connector.class Class name of connector from JAVA_PATH in running container. There are pre-built available connectors - "plugins", also we can add any exising plugin to container or write our own.
tasks.max only 1 task should be operatable at time - proper order and handling of bin log Kafka connect service uses connectors to start 1 or more task that do the actual work and distributes running tasks across the cluster of Kafka connect services. If any of services stopped or crashed those tasks will be redistributed to other running services
database.hostname RDS endpoint IP or DNS
database.port
database.user
database.password
database.server.id unique identifier of MySQL server - this is a master server ID
database.server.name logical ID of the server or cluster of services, used as prefix for all kafka topics
topic.prefix "mysql-server" - prefix that can be added to Kafka topic to distinguish it from other existing topics
database.include.list coma-separated list of DBs whose CDC should be captured
schema.history.internal.kafka.topic connector puts all DDL statements in this topic while reading the binlog. On restart the connector will recover the schema of the DB that existed in point in
key.converter className of converter or transformer for Event
value.converter value converter
table.include.list name_of_the_table

6. Export from MySQL table

Once connector is setup it will create a snapshot of data and will ingest into Kafka. After that connector will monitor for new records and updates in CDC.

debezium-1 | INFO MySQL|mysql-cdc|snapshot Exported 496538 of 548957 records for table 'public.users' after 00:41:53.279 [io.debezium.relational.RelationalSnapshotChangeEventSource] debezium-1 | INFO MySQL|mysql-cdc|snapshot Exported 605082 of 548957 records for table 'public.users' after 00:53:07.497 [io.debezium.relational.RelationalSnapshotChangeEventSource] debezium-1 | INFO MySQL|mysql-cdc|snapshot Finished exporting 605274 records for table 'public.users' (1 of 1 tables); total duration '00:53:07.522' [io.debezium.relational.RelationalSnapshotChangeEventSource] debezium-1 | INFO MySQL|mysql-cdc|snapshot Releasing table read lock to enable MySQL writes [io.debezium.connector.binlog.BinlogSnapshotChangeEventSource] debezium-1 | INFO MySQL|mysql-cdc|snapshot Writes to MySQL tables prevented for a total of 00:53:13.755 [io.debezium.connector.binlog.BinlogSnapshotChangeEventSource] debezium-1 | INFO || WorkerSourceTask{id=mysql-moderation-comments-connector-0} Committing offsets for 10242 acknowledged messages [org.apache.kafka.connect.runtime.WorkerSourceTask] 
Enter fullscreen mode Exit fullscreen mode

7. Verify Kafka Topic

Use kafka-console-consumer to check the topic for messages:

kafka-console-consumer --bootstrap-server localhost:9092 --topic mysql-cdc.your_database.your_table --from-beginning 
Enter fullscreen mode Exit fullscreen mode

Debezium Data Format

Debezium emits messages to Kafka in a JSON format with three main parts:

  • key (Identifies the specific database row.)
  • value (Contains the actual change event, with fields like before, after, op, ts_ms, etc.)

Fields:

  • before: State of the row before the change.
  • after: State of the row after the change.
  • op: Type of operation (c for create, u for update, d for delete).
  • source: Metadata about the event source.
  • ts_ms: Timestamp of the event.
{ "op": "u", "source": { ... }, "ts_ms": "...", "ts_us": "...", "ts_ns": "...", "before": { "userid": "1", "name": "bob" }, "after": { "userid": "1", "field2": "alice" } } 
Enter fullscreen mode Exit fullscreen mode

Hide schema from payload

There are multiple configurations and data formats that allow to transform event, hide not needed fields, also register custom SingleMessageTransformers etc.

Here we are instructing Debezium to exclude schema from the payload:

debezium.source.value.converter=org.apache.kafka.connect.json.JsonConverter debezium.source.value.converter.schemas.enable=false 
Enter fullscreen mode Exit fullscreen mode

Conclusion

Debezium is an excellent choice for capturing data changes in real time from MySQL RDS and streaming them into Kafka.
Its support for schema change tracking, rich message format, and scalability make it ideal for modern data pipelines.

Whether you choose standalone mode for simplicity or cluster mode for fault tolerance, Debezium provides the flexibility to meet your needs.

Links:

Top comments (0)