Db2 CDC Connector #
The Db2 CDC connector allows for reading snapshot data and incremental data from Db2 database. This document describes how to setup the db2 CDC connector to run SQL queries against Db2 databases.
Supported Databases #
| Connector | Database | Driver |
|---|---|---|
| Db2-cdc | Db2 Driver: 11.5.0.0 |
Dependencies #
In order to set up the Db2 CDC connector, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.
Maven dependency #
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-db2-cdc</artifactId> <version>3.5.0</version> </dependency>SQL Client JAR #
Download flink-sql-connector-db2-cdc-3.5.0.jar and put it under <FLINK_HOME>/lib/.
Note: Refer to flink-sql-connector-db2-cdc, more released versions will be available in the Maven central warehouse.
由于 Db2 Connector 采用的 IPLA 协议与 Flink CDC 项目不兼容,我们无法在 jar 包中提供 Db2 连接器。 您可能需要手动配置以下依赖:
| 依赖名称 | 说明 |
|---|---|
| com.ibm.db2.jcc:db2jcc:db2jcc4 | 用于连接到 Db2 数据库。 |
Setup Db2 server #
Follow the steps in the Debezium Db2 Connector.
Notes #
Not support BOOLEAN type in SQL Replication on Db2 #
Only snapshots can be taken from tables with BOOLEAN type columns. Currently, SQL Replication on Db2 does not support BOOLEAN, so Debezium can not perform CDC on those tables. Consider using another type to replace BOOLEAN type.
How to create a Db2 CDC table #
The Db2 CDC table can be defined as following:
-- checkpoint every 3 seconds Flink SQL> SET 'execution.checkpointing.interval' = '3s'; -- register a Db2 table 'products' in Flink SQL Flink SQL> CREATE TABLE products ( ID INT NOT NULL, NAME STRING, DESCRIPTION STRING, WEIGHT DECIMAL(10,3), PRIMARY KEY(ID) NOT ENFORCED ) WITH ( 'connector' = 'db2-cdc', 'hostname' = 'localhost', 'port' = '50000', 'username' = 'root', 'password' = '123456', 'database-name' = 'mydb', 'table-name' = 'myschema.products'); -- read snapshot and redo logs from products table Flink SQL> SELECT * FROM products; Connector Options #
| Option | Required | Default | Type | Description |
|---|---|---|---|---|
| connector | required | (none) | String | Specify what connector to use, here should be 'db2-cdc'. |
| hostname | required | (none) | String | IP address or hostname of the Db2 database server. |
| username | required | (none) | String | Name of the Db2 database to use when connecting to the Db2 database server. |
| password | required | (none) | String | Password to use when connecting to the Db2 database server. |
| database-name | required | (none) | String | Database name of the Db2 server to monitor. |
| table-name | required | (none) | String | Table name of the Db2 database to monitor, e.g.: "db1.table1" |
| port | optional | 50000 | Integer | Integer port number of the Db2 database server. |
| scan.startup.mode | optional | initial | String | Optional startup mode for Db2 CDC consumer, valid enumerations are "initial" and "latest-offset". Please see Startup Reading Position section for more detailed information. |
| server-time-zone | optional | (none) | String | The session time zone in database server, e.g. "Asia/Shanghai". It controls how the TIMESTAMP type in Db2 converted to STRING. See more here. If not set, then ZoneId.systemDefault() is used to determine the server time zone. |
| scan.incremental.snapshot.enabled | optional | true | Boolean | Whether enable parallelism snapshot. |
| chunk-meta.group.size | optional | 1000 | Integer | The group size of chunk meta, if the meta size exceeds the group size, the meta will be divided into multiple groups. |
| chunk-key.even-distribution.factor.lower-bound | optional | 0.05d | Double | The lower bound of chunk key distribution factor. The distribution factor is used to determine whether the table is evenly distribution or not. The table chunks would use evenly calculation optimization when the data distribution is even, and the query for splitting would happen when it is uneven. The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount. |
| chunk-key.even-distribution.factor.upper-bound | optional | 1000.0d | Double | The upper bound of chunk key distribution factor. The distribution factor is used to determine whether the table is evenly distribution or not. The table chunks would use evenly calculation optimization when the data distribution is even, and the query for splitting would happen when it is uneven. The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount. |
| scan.incremental.snapshot.chunk.key-column | optional | (none) | String | The chunk key of table snapshot, captured tables are split into multiple chunks by a chunk key when read the snapshot of table. By default, the chunk key is the first column of the primary key. A column that is not part of the primary key can be used as a chunk key, but this may lead to slower query performance. |
| debezium.* | optional | (none) | String | Pass-through Debezium's properties to Debezium Embedded Engine which is used to capture data changes from Db2 server. For example: 'debezium.snapshot.mode' = 'never'. See more about the Debezium's Db2 Connector properties |
| scan.incremental.close-idle-reader.enabled | optional | false | Boolean | Whether to close idle readers at the end of the snapshot phase. The flink version is required to be greater than or equal to 1.14 when 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' is set to true. If the flink version is greater than or equal to 1.15, the default value of 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' has been changed to true, so it does not need to be explicitly configured 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true' |
| scan.incremental.snapshot.unbounded-chunk-first.enabled | optional | true | Boolean | Whether to assign the unbounded chunks first during snapshot reading phase. This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk. |
Available Metadata #
The following format metadata can be exposed as read-only (VIRTUAL) columns in a table definition.
| Key | DataType | Description |
|---|---|---|
| table_name | STRING NOT NULL | Name of the table that contain the row. |
| schema_name | STRING NOT NULL | Name of the schema that contain the row. |
| database_name | STRING NOT NULL | Name of the database that contain the row. |
| op_ts | TIMESTAMP_LTZ(3) NOT NULL | It indicates the time that the change was made in the database. If the record is read from snapshot of the table instead of the change stream, the value is always 0. |
Features #
Startup Reading Position #
The config option scan.startup.mode specifies the startup mode for DB2 CDC consumer. The valid enumerations are:
initial(default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest redo logs.latest-offset: Never to perform snapshot on the monitored database tables upon first startup, just read from the end of the redo logs which means only have the changes since the connector was started.
Note: the mechanism of scan.startup.mode option relying on Debezium’s snapshot.mode configuration. So please do not using them together. If you speicifying both scan.startup.mode and debezium.snapshot.mode options in the table DDL, it may make scan.startup.mode doesn’t work.
DataStream Source #
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema; public class Db2SourceExample { public static void main(String[] args) throws Exception { SourceFunction<String> db2Source = Db2Source.<String>builder() .hostname("yourHostname") .port(50000) .database("yourDatabaseName") // set captured database .tableList("yourSchemaName.yourTableName") // set captured table .username("yourUsername") .password("yourPassword") .deserializer( new JsonDebeziumDeserializationSchema()) // converts SourceRecord to // JSON String .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // enable checkpoint env.enableCheckpointing(3000); env.addSource(db2Source) .print() .setParallelism(1); // use parallelism 1 for sink to keep message ordering env.execute("Print Db2 Snapshot + Change Stream"); } } 可用的指标 #
指标系统能够帮助了解分片分发的进展, 下面列举出了支持的 Flink 指标 Flink metrics:
| Group | Name | Type | Description |
|---|---|---|---|
| namespace.schema.table | isSnapshotting | Gauge | 表是否在快照读取阶段 |
| namespace.schema.table | isStreamReading | Gauge | 表是否在增量读取阶段 |
| namespace.schema.table | numTablesSnapshotted | Gauge | 已经被快照读取完成的表的数量 |
| namespace.schema.table | numTablesRemaining | Gauge | 还没有被快照读取的表的数据 |
| namespace.schema.table | numSnapshotSplitsProcessed | Gauge | 正在处理的分片的数量 |
| namespace.schema.table | numSnapshotSplitsRemaining | Gauge | 还没有被处理的分片的数量 |
| namespace.schema.table | numSnapshotSplitsFinished | Gauge | 已经处理完成的分片的数据 |
| namespace.schema.table | snapshotStartTime | Gauge | 快照读取阶段开始的时间 |
| namespace.schema.table | snapshotEndTime | Gauge | 快照读取阶段结束的时间 |
注意:
- Group 名称是
namespace.schema.table,这里的namespace是实际的数据库名称,schema是实际的 schema 名称,table是实际的表名称。 - 对于 DB2,Group 的名称会类似于
test_database.test_schema.test_table。
The DB2 CDC incremental connector (since 3.1.0) can be used as the following shows:
import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.cdc.connectors.base.options.StartupOptions; import org.apache.flink.cdc.connectors.db2.source.Db2SourceBuilder; import org.apache.flink.cdc.connectors.db2.source.Db2SourceBuilder.Db2IncrementalSource; import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema; public class Db2ParallelSourceExample { public static void main(String[] args) throws Exception { Db2IncrementalSource<String> sqlServerSource = new Db2SourceBuilder() .hostname("localhost") .port(50000) .databaseList("TESTDB") .tableList("DB2INST1.CUSTOMERS") .username("flink") .password("flinkpw") .deserializer(new JsonDebeziumDeserializationSchema()) .startupOptions(StartupOptions.initial()) .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // enable checkpoint env.enableCheckpointing(3000); // set the source parallelism to 2 env.fromSource(sqlServerSource, WatermarkStrategy.noWatermarks(), "Db2IncrementalSource") .setParallelism(2) .print() .setParallelism(1); env.execute("Print DB2 Snapshot + Change Stream"); } } Tables Without primary keys #
Starting from version 3.4.0, DB2 CDC support tables that do not have a primary key. To use a table without primary keys, you must configure the scan.incremental.snapshot.chunk.key-column option and specify one non-null field.
There are two places that need to be taken care of.
- If there is an index in the table, try to use a column which is contained in the index in
scan.incremental.snapshot.chunk.key-column. This will increase the speed of select statement. - The processing semantics of a DB2 CDC table without primary keys is determined based on the behavior of the column that are specified by the
scan.incremental.snapshot.chunk.key-column.
- If no update operation is performed on the specified column, the exactly-once semantics is ensured.
- If the update operation is performed on the specified column, only the at-least-once semantics is ensured. However, you can specify primary keys at downstream and perform the idempotence operation to ensure data correctness.
Warning #
Using a non-primary key column as the scan.incremental.snapshot.chunk.key-column for a DB2 table with primary keys may lead to data inconsistencies. Below is a scenario illustrating this issue and recommendations to mitigate potential problems.
Problem Scenario #
-
Table Structure:
- Primary Key:
id - Chunk Key Column:
pid(Not a primary key)
- Primary Key:
-
Snapshot Splits:
- Split 0:
1 < pid <= 3 - Split 1:
3 < pid <= 5
- Split 0:
-
Operation:
- Two different subtasks are reading Split 0 and Split 1 concurrently.
- An update operation changes
pidfrom2to4forid=0while both splits are being read. This update occurs between the low and high watermark of both splits.
-
Result:
- Split 0: Contains the record
[id=0, pid=2] - Split 1: Contains the record
[id=0, pid=4]
- Split 0: Contains the record
Since the order of processing these records cannot be guaranteed, the final value of pid for id=0 may end up being either 2 or 4, leading to potential data inconsistencies.
Data Type Mapping #
| Db2 type | Flink SQL type | NOTE |
|---|---|---|
| SMALLINT | SMALLINT | |
| INTEGER | INT | |
| BIGINT | BIGINT | |
| REAL | FLOAT | |
| DOUBLE | DOUBLE | |
| NUMERIC(p, s) DECIMAL(p, s) | DECIMAL(p, s) | |
| DATE | DATE | |
| TIME | TIME | |
| TIMESTAMP [(p)] | TIMESTAMP [(p)] | |
| CHARACTER(n) | CHAR(n) | |
| VARCHAR(n) | VARCHAR(n) | |
| BINARY(n) | BINARY(n) | |
| VARBINARY(N) | VARBINARY(N) | |
| BLOB CLOB DBCLOB | BYTES | |
| VARGRAPHIC XML | STRING |