OceanBase

OceanBase CDC 连接器 #

OceanBase CDC 连接器允许从 OceanBase 读取快照数据和增量数据。本文介绍了如何设置 OceanBase CDC 连接器以对 OceanBase 进行 SQL 查询。

OceanBase CDC 方案 #

名词解释:

  • OceanBase CE: OceanBase 社区版。OceanBase 的开源版本,兼容 MySQL https://github.com/oceanbase/oceanbase
  • OceanBase EE: OceanBase 企业版。OceanBase 的商业版本,支持 MySQL 和 Oracle 两种兼容模式 https://www.oceanbase.com
  • OceanBase Cloud: OceanBase 云数据库 https://www.oceanbase.com/product/cloud
  • Binlog Service CE: OceanBase Binlog 服务社区版。OceanBase 社区版的一个兼容 MySQL 复制协议的解决方案,详情见文档
  • Binlog Service EE: OceanBase Binlog 服务企业版。OceanBase 企业版 MySQL 模式的一个兼容 MySQL 复制协议的解决方案,仅可在阿里云使用,详情见操作指南
  • MySQL Driver: mysql-connector-java,可用于 OceanBase 社区版和 OceanBase 企业版 MySQL 模式。
  • OceanBase Driver: OceanBase JDBC 驱动,支持所有版本的 MySQL 和 Oracle 兼容模式 https://github.com/oceanbase/obconnector-j

OceanBase CDC 源端读取方案:

数据库类型 支持的驱动 CDC 连接器 其他用到的组件
OceanBase CE MySQL Driver: 8.0.x
OceanBase CDC Connector Binlog Service CE
MySQL Driver: 8.0.x MySQL CDC Connector Binlog Service CE
OceanBase EE (MySQL 模式) MySQL Driver: 8.0.x
OceanBase CDC Connector Binlog Service EE
MySQL Driver: 8.0.x MySQL CDC Connector Binlog Service EE
OceanBase EE (Oracle 模式) 暂不支持 OceanBase Oracle 兼容模式下的增量订阅服务,请联系企业技术支持。

注意事项: oceanbase-cdc 连接器 从 3.5 版本开始进行了以下大的改动:

  • 先前基于 OceanBase Log Proxy 服务实现的连接器已被正式移除,当前将仅支持连接到 OceanBase Binlog 服务。
  • 当前版本 oceanbase-cdc 将基于 mysql-cdc 连接器实现,主要改进了对 OceanBase Binlog 服务的兼容性,包含一些 Bug 修复,推荐使用。
  • 由于 OceanBase Binlog 服务兼容 MySQL 复制协议,仍然支持使用 MySQL CDC 连接器连接到 OceanBase Binlog 服务。
  • 暂不支持 OceanBase Oracle 兼容模式下的增量订阅服务,请联系企业技术支持。

依赖 #

为了使用 OceanBase CDC 连接器,您必须提供相关的依赖信息。以下依赖信息适用于使用自动构建工具(如 Maven 或 SBT)构建的项目和带有 SQL JAR 包的 SQL 客户端。

Maven dependency #

<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-oceanbase-cdc</artifactId> <version>3.5.0</version> </dependency>

SQL Client JAR #

下载链接仅在已发布版本可用,请在文档网站左下角选择浏览已发布的版本。

下载flink-sql-connector-oceanbase-cdc-3.5.0.jar<FLINK_HOME>/lib/ 目录下。

注意: 参考 flink-sql-connector-oceanbase-cdc 当前已发布的所有版本都可以在 Maven 中央仓库获取。

由于 MySQL Driver 使用的开源协议与 Flink CDC 项目不兼容,我们无法在 jar 包中提供驱动。 您可能需要手动配置以下依赖:

依赖名称 说明
mysql:mysql-connector-java:8.0.27 用于连接到 OceanBase 数据库的 MySQL 租户。

部署 OceanBase 数据库和 Binlog 服务 #

  1. 按照 文档 部署 OceanBase 数据库。
  2. 按照 文档 部署 OceanBase Binlog 服务。

使用文档 #

注意事项

  • 当前版本 oceanbase-cdc 连接器基于 mysql-cdc 连接器实现,只修改了部分内部实现。外部接口、使用参数等 和 mysql-cdc 基本保持一致,使用文档也可参考MySQL CDC 使用文档

创建 OceanBase CDC 表 #

OceanBase CDC 表可以定义如下:

-- 每 3 秒做一次 checkpoint,用于测试,生产配置建议5到10分钟 Flink SQL> SET 'execution.checkpointing.interval' = '3s';  -- 在 Flink SQL中注册 OceanBase 表 'orders' Flink SQL> CREATE TABLE orders (  order_id INT,  order_date TIMESTAMP(0),  customer_name STRING,  price DECIMAL(10, 5),  product_id INT,  order_status BOOLEAN,  PRIMARY KEY(order_id) NOT ENFORCED  ) WITH (  'connector' = 'oceanbase-cdc',  'hostname' = 'localhost',  'port' = '2881',  'username' = 'root',  'password' = '123456',  'database-name' = 'mydb',  'table-name' = 'orders');  -- 从订单表读取全量数据(快照)和增量数据(binlog) Flink SQL> SELECT * FROM orders; 

连接器选项 #

Option Required Default Type Description
connector required (none) String 指定要使用的连接器, 这里应该是 'oceanbase-cdc'.
hostname required (none) String OceanBase 服务器的 IP 地址或主机名。
username required (none) String 连接到 OceanBase 数据库服务器时要使用的 OceanBase 用户的名称。
password required (none) String 连接 OceanBase 数据库服务器时使用的密码。
database-name required (none) String 要监视的 OceanBase 服务器的数据库名称。数据库名称还支持正则表达式,以监视多个与正则表达式匹配的表。
table-name required (none) String 需要监视的 OceanBase 数据库的表名。表名支持正则表达式,以监视满足正则表达式的多个表。注意:OceanBase CDC 连接器在正则匹配表名时,会把用户填写的 database-name, table-name 通过字符串 `\\.` 连接成一个全路径的正则表达式,然后使用该正则表达式和 OceanBase 数据库中表的全限定名进行正则匹配。
port optional 2881 Integer OceanBase 数据库服务器的整数端口号。
server-id optional (none) String 读取数据使用的 server id,server id 可以是个整数或者一个整数范围,比如 '5400' 或 '5400-5408', 建议在 'scan.incremental.snapshot.enabled' 参数为启用时,配置成整数范围。 默认情况下,连接器会在 5400 和 6400 之间生成一个随机数,但是我们建议用户明确指定 Server id。
scan.incremental.snapshot.enabled optional true Boolean 增量快照是一种读取表快照的新机制,与旧的快照机制相比, 增量快照有许多优点,包括: (1)在快照读取期间,Source 支持并发读取, (2)在快照读取期间,Source 支持进行 chunk 粒度的 checkpoint, (3)在快照读取之前,Source 不需要数据库锁权限。 如果希望 Source 并行运行,则每个并行 Readers 都应该具有唯一的 Server id,所以 Server id 必须是类似 `5400-6400` 的范围,并且该范围必须大于并行度。 请查阅 增量快照读取 章节了解更多详细信息。
scan.incremental.snapshot.chunk.size optional 8096 Integer 表快照的块大小(行数),读取表的快照时,捕获的表被拆分为多个块。
scan.snapshot.fetch.size optional 1024 Integer 读取表快照时每次读取数据的最大条数。
scan.incremental.snapshot.chunk.key-column optional (none) String 表快照的分片键,在读取表的快照时,被捕获的表会按分片键拆分为多个分片。 默认情况下,分片键是主键的第一列。可以使用非主键列作为分片键,但这可能会导致查询性能下降。
警告: 使用非主键列作为分片键可能会导致数据不一致。请参阅 增量快照读取 章节了解详细信息。
scan.startup.mode optional initial String OceanBase CDC 消费者可选的启动模式, 合法的模式为 "initial","earliest-offset","latest-offset","specific-offset","timestamp" 和 "snapshot"。 请查阅 启动模式 章节了解更多详细信息。
scan.startup.specific-offset.file optional (none) String 在 "specific-offset" 启动模式下,启动位点的 binlog 文件名。
scan.startup.specific-offset.pos optional (none) Long 在 "specific-offset" 启动模式下,启动位点的 binlog 文件位置。
scan.startup.specific-offset.gtid-set optional (none) String 在 "specific-offset" 启动模式下,启动位点的 GTID 集合。
scan.startup.timestamp-millis optional (none) Long 在 "timestamp" 启动模式下,启动位点的毫秒时间戳。
scan.startup.specific-offset.skip-events optional (none) Long 在指定的启动位点后需要跳过的事件数量。
scan.startup.specific-offset.skip-rows optional (none) Long 在指定的启动位点后需要跳过的数据行数量。
server-time-zone optional (none) String 数据库服务器中的会话时区, 例如: "Asia/Shanghai". 它控制 OceanBase 中的时间戳类型如何转换为字符串。 更多请参考 这里. 如果没有设置,则使用ZoneId.systemDefault()来确定服务器时区。
debezium.min.row. count.to.stream.result optional 1000 Integer 在快照操作期间,连接器将查询每个包含的表,以生成该表中所有行的读取事件。 此参数确定 OceanBase 连接是否将表的所有结果拉入内存(速度很快,但需要大量内存), 或者结果是否需要流式传输(传输速度可能较慢,但适用于非常大的表)。 该值指定了在连接器对结果进行流式处理之前,表必须包含的最小行数,默认值为1000。将此参数设置为`0`以跳过所有表大小检查,并始终在快照期间对所有结果进行流式处理。
connect.timeout optional 30s Duration 连接器在尝试连接到 OceanBase 数据库服务器后超时前应等待的最长时间。该时长不能少于250毫秒。
connect.max-retries optional 3 Integer 连接器应重试以建立 OceanBase 数据库服务器连接的最大重试次数。
connection.pool.size optional 20 Integer 连接池大小。
jdbc.properties.* optional 20 String 传递自定义 JDBC URL 属性的选项。用户可以传递自定义属性,如 'jdbc.properties.useSSL' = 'false'.
heartbeat.interval optional 30s Duration 用于跟踪最新可用 binlog 偏移的发送心跳事件的间隔。
debezium.* optional (none) String 将 Debezium 的属性传递给 Debezium 嵌入式引擎,该引擎用于从 OceanBase 服务器捕获数据更改。 For example: 'debezium.snapshot.mode' = 'never'. 查看更多关于 Debezium 的 MySQL 连接器属性
scan.incremental.close-idle-reader.enabled optional false Boolean 是否在快照结束后关闭空闲的 Reader。 此特性需要 flink 版本大于等于 1.14 并且 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 需要设置为 true。
scan.parse.online.schema.changes.enabled optional false Boolean 是否尝试解析由 gh-ostpt-osc 工具生成的表结构变更事件。 这些工具会在变更表结构时,将变更语句应用到“影子表”之上,并稍后将其与主表进行交换,以达到表结构变更的目的。
这是一项实验性功能。
scan.incremental.snapshot.unbounded-chunk-first.enabled optional false Boolean 快照读取阶段是否先分配 UnboundedChunk。
这有助于降低 TaskManager 在快照阶段同步最后一个chunk时遇到内存溢出 (OOM) 的风险。
这是一项实验特性,默认为 false。
scan.read-changelog-as-append-only.enabled optional false Boolean 是否将 changelog 数据流转换为 append-only 数据流。
仅在需要保存上游表删除消息等特殊场景下开启使用,比如在逻辑删除场景下,用户不允许物理删除下游消息,此时使用该特性,并配合 row_kind 元数据字段,下游可以先保存所有明细数据,再通过 row_kind 字段判断是否进行逻辑删除。
参数取值如下:
  • true:所有类型的消息(包括INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER)都会转换成 INSERT 类型的消息。
  • false(默认):所有类型的消息都保持原样下发。
  • scan.incremental.snapshot.backfill.skip optional false Boolean 是否在快照读取阶段跳过 backfill 。
    如果跳过 backfill ,快照阶段捕获表的更改将在稍后的 binlog 读取阶段被回放,而不是合并到快照中。
    警告:跳过 backfill 可能会导致数据不一致,因为快照阶段发生的某些 binlog 事件可能会被重放(仅保证 at-least-once )。 例如,更新快照阶段已更新的值,或删除快照阶段已删除的数据。这些重放的 binlog 事件应进行特殊处理。
    use.legacy.json.format optional true Boolean 是否使用 legacy JSON 格式来转换 Binlog 中的 JSON 类型的数据。
    这代表着是否使用 legacy JSON 格式来转换 Binlog 中的 JSON 类型的数据。 如果用户配置 'use.legacy.json.format' = 'true',则从 Binlog 中转换 JSON 类型的数据时,会移除值之前的空格和逗号之后的空格。例如, Binlog 中 JSON 类型的数据 {"key1": "value1", "key2": "value2"} 会被转换为 {"key1":"value1","key2":"value2"}。 如果设置 'use.legacy.json.format' = 'false', 这条数据会被转换为 {"key1": "value1", "key2": "value2"}, 也就是 key 和 value 前的空格都会被保留。

    支持的元数据 #

    下表中的元数据可以在 DDL 中作为只读(虚拟)meta 列声明。

    Key DataType Description
    table_name STRING NOT NULL 当前记录所属的表名称。
    database_name STRING NOT NULL 当前记录所属的库名称。
    op_ts TIMESTAMP_LTZ(3) NOT NULL 当前记录表在数据库中更新的时间。
    如果从表的快照而不是 binlog 读取记录,该值将始终为0。
    row_kind STRING NOT NULL 当前记录的变更类型。
    注意:如果 Source 算子选择为每条记录输出 row_kind 列,则下游 SQL 操作符在处理回撤时可能会由于此新添加的列而无法比较,导致出现非确定性更新问题。建议仅在简单的同步作业中使用此元数据列。
    '+I' 表示 INSERT 消息,'-D' 表示 DELETE 消息,'-U' 表示 UPDATE_BEFORE 消息,'+U' 表示 UPDATE_AFTER 消息。

    下述创建表示例展示元数据列的用法:

    CREATE TABLE products (  db_name STRING METADATA FROM 'database_name' VIRTUAL,  table_name STRING METADATA FROM 'table_name' VIRTUAL,  operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,  operation STRING METADATA FROM 'row_kind' VIRTUAL,  order_id INT,  order_date TIMESTAMP(0),  customer_name STRING,  price DECIMAL(10, 5),  product_id INT,  order_status BOOLEAN,  PRIMARY KEY(order_id) NOT ENFORCED ) WITH (  'connector' = 'oceanbase-cdc',  'hostname' = 'localhost',  'port' = '2881',  'username' = 'root',  'password' = '123456',  'database-name' = 'mydb',  'table-name' = 'orders' ); 

    下述创建表示例展示使用正则表达式匹配多张库表的用法:

    CREATE TABLE products (  db_name STRING METADATA FROM 'database_name' VIRTUAL,  table_name STRING METADATA FROM 'table_name' VIRTUAL,  operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,  operation STRING METADATA FROM 'row_kind' VIRTUAL,  order_id INT,  order_date TIMESTAMP(0),  customer_name STRING,  price DECIMAL(10, 5),  product_id INT,  order_status BOOLEAN,  PRIMARY KEY(order_id) NOT ENFORCED ) WITH (  'connector' = 'oceanbase-cdc',  'hostname' = 'localhost',  'port' = '2881',  'username' = 'root',  'password' = '123456',  'database-name' = '(^(test).*|^(tpc).*|txc|.*[p$]|t{2})',  'table-name' = '(t[5-8]|tt)' ); 
    匹配示例 表达式 描述
    前缀匹配 ^(test).* 匹配前缀为test的数据库名或表名,例如test1、test2等。
    后缀匹配 .*[p$] 匹配后缀为p的数据库名或表名,例如cdcp、edcp等。
    特定匹配 txc 匹配具体的数据库名或表名。

    进行库表匹配时,会使用正则表达式 database-name\\.table-name 来与OceanBase表的全限定名做匹配,所以该例子使用 (^(test).*|^(tpc).*|txc|.*[p$]|t{2})\\.(t[5-8]|tt),可以匹配到表 txc.tt、test2.test5。

    数据类型映射 #

    OceanBase type Flink SQL type NOTE
    TINYINT TINYINT
    SMALLINT
    TINYINT UNSIGNED
    TINYINT UNSIGNED ZEROFILL
    SMALLINT
    INT
    MEDIUMINT
    SMALLINT UNSIGNED
    SMALLINT UNSIGNED ZEROFILL
    INT
    BIGINT
    INT UNSIGNED
    INT UNSIGNED ZEROFILL
    MEDIUMINT UNSIGNED
    MEDIUMINT UNSIGNED ZEROFILL
    BIGINT
    BIGINT UNSIGNED
    BIGINT UNSIGNED ZEROFILL
    SERIAL
    DECIMAL(20, 0)
    FLOAT
    FLOAT UNSIGNED
    FLOAT UNSIGNED ZEROFILL
    FLOAT
    REAL
    REAL UNSIGNED
    REAL UNSIGNED ZEROFILL
    DOUBLE
    DOUBLE UNSIGNED
    DOUBLE UNSIGNED ZEROFILL
    DOUBLE PRECISION
    DOUBLE PRECISION UNSIGNED
    DOUBLE PRECISION UNSIGNED ZEROFILL
    DOUBLE
    NUMERIC(p, s)
    NUMERIC(p, s) UNSIGNED
    NUMERIC(p, s) UNSIGNED ZEROFILL
    DECIMAL(p, s)
    DECIMAL(p, s) UNSIGNED
    DECIMAL(p, s) UNSIGNED ZEROFILL
    FIXED(p, s)
    FIXED(p, s) UNSIGNED
    FIXED(p, s) UNSIGNED ZEROFILL
    where p <= 38
    DECIMAL(p, s)
    NUMERIC(p, s)
    NUMERIC(p, s) UNSIGNED
    NUMERIC(p, s) UNSIGNED ZEROFILL
    DECIMAL(p, s)
    DECIMAL(p, s) UNSIGNED
    DECIMAL(p, s) UNSIGNED ZEROFILL
    FIXED(p, s)
    FIXED(p, s) UNSIGNED
    FIXED(p, s) UNSIGNED ZEROFILL
    where 38 < p <= 65
    STRING 在 OceanBase MySQL 兼容模式中,十进制数据类型的精度高达 65,但在 Flink 中,十进制数据类型的精度仅限于 38。所以,如果定义精度大于 38 的十进制列,则应将其映射到字符串以避免精度损失。
    BOOLEAN
    TINYINT(1)
    BIT(1)
    BOOLEAN
    DATE DATE
    TIME [(p)] TIME [(p)]
    TIMESTAMP [(p)]
    DATETIME [(p)]
    TIMESTAMP [(p)]
    CHAR(n) CHAR(n)
    VARCHAR(n) VARCHAR(n)
    BIT(n) BINARY(⌈n/8⌉)
    BINARY(n) BINARY(n)
    VARBINARY(N) VARBINARY(N)
    TINYTEXT
    TEXT
    MEDIUMTEXT
    LONGTEXT
    STRING
    TINYBLOB
    BLOB
    MEDIUMBLOB
    LONGBLOB
    BYTES 目前,对于 BLOB 数据类型,仅支持长度不大于 2147483647(2**31-1)的 blob。
    YEAR INT
    ENUM STRING
    JSON STRING JSON 数据类型将在 Flink 中转换为 JSON 格式的字符串。
    SET ARRAY<STRING> 因为 SET 数据类型是一个字符串对象,可以有零个或多个值 它应该始终映射到字符串数组。
    GEOMETRY
    POINT
    LINESTRING
    POLYGON
    MULTIPOINT
    MULTILINESTRING
    MULTIPOLYGON
    GEOMETRYCOLLECTION
    STRING 空间数据类型将转换为具有固定 Json 格式的字符串。 请参考 空间数据类型映射 章节了解更多详细信息。

    空间数据类型映射 #

    GEOMETRYCOLLECTION之外的空间数据类型都会转换为 Json 字符串,格式固定,如:

    {"srid": 0 , "type": "xxx", "coordinates": [0, 0]} 

    字段srid标识定义几何体的 SRS,如果未指定 SRID,则 SRID 0 是新几何体值的默认值。

    字段type标识空间数据类型,例如POINT/LINESTRING/POLYGON

    字段coordinates表示空间数据的坐标

    对于GEOMETRYCOLLECTION,它将转换为 Json 字符串,格式固定,如:

    {"srid": 0 , "type": "GeometryCollection", "geometries": [{"type":"Point","coordinates":[10,10]}]} 

    Geometrics字段是一个包含所有空间数据的数组。

    不同空间数据类型映射的示例如下:

    Spatial data Json String converted in Flink
    POINT(1 1) {"coordinates":[1,1],"type":"Point","srid":0}
    LINESTRING(3 0, 3 3, 3 5) {"coordinates":[[3,0],[3,3],[3,5]],"type":"LineString","srid":0}
    POLYGON((1 1, 2 1, 2 2, 1 2, 1 1)) {"coordinates":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],"type":"Polygon","srid":0}
    MULTIPOINT((1 1),(2 2)) {"coordinates":[[1,1],[2,2]],"type":"MultiPoint","srid":0}
    MultiLineString((1 1,2 2,3 3),(4 4,5 5)) {"coordinates":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],"type":"MultiLineString","srid":0}
    MULTIPOLYGON(((0 0, 10 0, 10 10, 0 10, 0 0)), ((5 5, 7 5, 7 7, 5 7, 5 5))) {"coordinates":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],"type":"MultiPolygon","srid":0}
    GEOMETRYCOLLECTION(POINT(10 10), POINT(30 30), LINESTRING(15 15, 20 20)) {"geometries":[{"type":"Point","coordinates":[10,10]},{"type":"Point","coordinates":[30,30]},{"type":"LineString","coordinates":[[15,15],[20,20]]}],"type":"GeometryCollection","srid":0}

    Back to top