如何使用Kafka Connect实现同步RDS binlog数据

简介: 本文介绍如何在E-MapReduce上使用Kafka Connect实现同步RDS binlog数据

1. 背景

在我们的业务开发中,往往会碰到下面这个场景:

  • 业务更新数据写到数据库中
  • 业务更新数据需要实时传递给下游依赖处理

所以传统的处理架构可能会这样:

image

但这个架构也存在着不少弊端:我们需要在项目中维护很多发送消息的代码。新增或者更新消息都会带来不少维护成本。所以,更好的处理方式应该是直接将数据库的数据接入到流式系统中,如下图:
image

本文将演示如何在E-MapReduce上实现将RDS binlog实时同步到Kafka集群中。

2. 环境准备

实验中使用VPC网络环境,以下实例创建时默认都是在VPC环境下。

2.1 准备一个测试RDS数据库

创建一个RDS实例,版本选择5.7。这里不赘述如何创建RDS,详细流程请参考RDS文档。创建完如图:
image

2.2 准备一个Kafka集群

创建一个E-MapReduce Kafka集群,版本选择EMR-3.11.0。需要注意,这里必须选择EMR-3.11.0以上版本,否则不会默认安装启动Kafka Connect服务。详细创建流程请参考E-MapReduce文档。创建完如图:
image

注意:RDS实例和E-MapReduce Kafka集群最好在同一个VPC中,否则需要打通两个VPC之间的网络。

3. Kafka Connect

3.1 Connector

Kafka Connect是一个用于Kafka和其他数据系统之间进行数据传输的工具,它可以实现基于Kafka的数据管道,打通上下游数据源。我们需要做的就是在Kafka Connect服务上运行一个Connector,这个Connector是具体实现如何从/向数据源中读/写数据。Confluent提供了很多Connector实现,你可以在这里下载。不过今天我们使用Debezium提供的一个MySQL Connector插件,下载地址

  • 下载这个插件,并将解压出来的jar包全部拷贝到kafka lib目录下。注意:需要将这些jar包拷贝到Kafka集群所有机器上。
  • 在Kafka集群的服务列表中重启Kafka Connect组件。
    image

3.2 启动Connector

在创建connector前,我们需要做一番配置,这里罗列一些Debezium MySQL Connector的主要配置项:

database.hostname=x.x.x.x database.port=3306 database.user=tom database.password=password database.server.id=123456 database.server.name=fullfillment database.whitelist=inventory database.history.kafka.bootstrap.servers=y.y.y.y:9092 database.history.kafka.topic=dbhistory.fullfillment include.schema.changes=true

登录到Kafka集群,配置并创建一个connector,命令如下:

curl -X POST -H "Content-Type: application/json" --data '{"name": "rds-binlog", "config": {"connector.class":"io.debezium.connector.mysql.MySqlConnector", "database.hostname": "x.x.x.x", "database.port": "3306", "database.user": "tom", "database.password": "password", "database.server.id": "123456", "database.server.name": "fulfillment", "database.history.kafka.bootstrap.servers": "y.y.y.y:9092", "database.history.kafka.topic": "dbhistory.fullfillment", "include.schema.changes": "true"}}' http://emr-worker-1:8083/connectors

这时,我们可以看到一个创建好的connector,如图:
image

3.3 注意事项

  • server_id是多少?:你可以在RDS执行"SELECT @@server_id;"查到。
  • 创建connector时可能会出现连接失败,请确保RDS的白名单已经授权了Kafka集群机器访问。

4 测试

4.1 创建一张表

image

一会之后,Kafka集群中会自动创建一个对应的topic
image

插入几条数据

image

查看binlog数据

查看fulfillment.mugen.students这个topic,是否有刚刚新插入的数据

kafka-console-consumer.sh --zookeeper emr-header-1:2181/kafka-1.0.1 --topic fulfillment.mugen.students --from-beginning

结果如图所示:

image

5. 资料

目录
相关文章
|
6月前
|
缓存 NoSQL 关系型数据库
美团面试:MySQL有1000w数据,redis只存20w的数据,如何做 缓存 设计?
美团面试:MySQL有1000w数据,redis只存20w的数据,如何做 缓存 设计?
美团面试:MySQL有1000w数据,redis只存20w的数据,如何做 缓存 设计?
|
4月前
|
SQL 人工智能 关系型数据库
如何实现MySQL百万级数据的查询?
本文探讨了在MySQL中对百万级数据进行排序分页查询的优化策略。面对五百万条数据,传统的浅分页和深分页查询效率较低,尤其深分页因偏移量大导致性能显著下降。通过为排序字段添加索引、使用联合索引、手动回表等方法,有效提升了查询速度。最终建议根据业务需求选择合适方案:浅分页可加单列索引,深分页推荐联合索引或子查询优化,同时结合前端传递最后一条数据ID的方式实现高效翻页。
230 0
|
1月前
|
NoSQL 算法 Redis
【Docker】(3)学习Docker中 镜像与容器数据卷、映射关系!手把手带你安装 MySql主从同步 和 Redis三主三从集群!并且进行主从切换与扩容操作,还有分析 哈希分区 等知识点!
Union文件系统(UnionFS)是一种**分层、轻量级并且高性能的文件系统**,它支持对文件系统的修改作为一次提交来一层层的叠加,同时可以将不同目录挂载到同一个虚拟文件系统下(unite several directories into a single virtual filesystem) Union 文件系统是 Docker 镜像的基础。 镜像可以通过分层来进行继承,基于基础镜像(没有父镜像),可以制作各种具体的应用镜像。
247 5
|
3月前
|
存储 关系型数据库 MySQL
在CentOS 8.x上安装Percona Xtrabackup工具备份MySQL数据步骤。
以上就是在CentOS8.x上通过Perconaxtabbackup工具对Mysql进行高效率、高可靠性、无锁定影响地实现在线快速全量及增加式数据库资料保存与恢复流程。通过以上流程可以有效地将Mysql相关资料按需求完成定期或不定期地保存与灾难恢复需求。
266 10
|
4月前
|
SQL 存储 缓存
MySQL 如何高效可靠处理持久化数据
本文详细解析了 MySQL 的 SQL 执行流程、crash-safe 机制及性能优化策略。内容涵盖连接器、分析器、优化器、执行器与存储引擎的工作原理,深入探讨 redolog 与 binlog 的两阶段提交机制,并分析日志策略、组提交、脏页刷盘等关键性能优化手段,帮助提升数据库稳定性与执行效率。
119 0
|
7月前
|
关系型数据库 MySQL Linux
在Linux环境下备份Docker中的MySQL数据并传输到其他服务器以实现数据级别的容灾
以上就是在Linux环境下备份Docker中的MySQL数据并传输到其他服务器以实现数据级别的容灾的步骤。这个过程就像是一场接力赛,数据从MySQL数据库中接力棒一样传递到备份文件,再从备份文件传递到其他服务器,最后再传递回MySQL数据库。这样,即使在灾难发生时,我们也可以快速恢复数据,保证业务的正常运行。
317 28
|
6月前
|
存储 SQL 缓存
mysql数据引擎有哪些
MySQL 提供了多种存储引擎,每种引擎都有其独特的特点和适用场景。以下是一些常见的 MySQL 存储引擎及其特点:
165 0
|
2月前
|
SQL 运维 关系型数据库
深入探讨MySQL的二进制日志(binlog)选项
总结而言,对MySQL binlogs深度理解并妥善配置对数据库运维管理至关重要;它不仅关系到系统性能优化也是实现高可靠性架构设计必须考虑因素之一。通过精心规划与周密部署可以使得该机能充分发挥作用而避免潜在风险带来影响。
93 6
|
3月前
|
存储 SQL 关系型数据库
MySQL中binlog、redolog与undolog的不同之处解析
每个都扮演回答回溯与错误修正机构角色: BinLog像历史记载员详细记载每件大大小小事件; RedoLog则像紧急救援队伍遇见突發情況追踪最后活动轨迹尽力补救; UndoLog就类似时间机器可倒带历史让一切归位原始样貌同时兼具平行宇宙观察能让多人同时看见各自期望看见历程而互不干扰.
190 9
|
4月前
|
存储 SQL 关系型数据库
MySQL的Redo Log与Binlog机制对照分析
通过合理的配置和细致的管理,这两种日志机制相互配合,能够有效地提升MySQL数据库的可靠性和稳定性。
162 10

推荐镜像

查看更多
下一篇