All Products
Search
Document Center

DataWorks:Kafka data source

Last Updated:Aug 22, 2025

The Kafka data source provides a bidirectional channel to read data from and write data to Kafka. This topic describes the data synchronization capabilities that DataWorks provides for Kafka.

Supported versions

DataWorks supports Alibaba Cloud Kafka and self-managed Kafka versions from 0.10.2 to 3.6.x.

Note

Data synchronization is not supported for Kafka versions earlier than 0.10.2. This is because these versions do not support retrieving partition offsets and their data structures may not support timestamps.

Real-time read

  • If you use a subscription Serverless resource group, you must estimate the required specifications in advance to prevent task failures due to insufficient resources.

    Estimate 1 CU per topic. You must also estimate resources based on traffic:

    • For uncompressed Kafka data, estimate 1 CU for every 10 MB/s of traffic.

    • For compressed Kafka data, estimate 2 CU for every 10 MB/s of traffic.

    • For compressed Kafka data that requires JSON parsing, estimate 3 CU for every 10 MB/s of traffic.

  • When you use a subscription Serverless resource group or an old version of an exclusive resource group for Data Integration:

    • If your workload has a high tolerance for failover, the cluster slot usage should not exceed 80%.

    • If your workload has a low tolerance for failover, the cluster slot usage should not exceed 70%.

Note

The actual resource usage depends on factors such as the data content and format. You must adjust the resources based on the actual runtime conditions after your initial estimation.

Limits

The Kafka data source supports Serverless resource groups (recommended) and old versions of exclusive resource groups for Data Integration.

Offline read from a single table

If both parameter.groupId and parameter.kafkaConfig.group.id are configured, parameter.groupId takes precedence over group.id in the kafkaConfig parameter.

Real-time write to a single table

Data deduplication is not supported for write operations. If a task is restarted after an offset reset or a failover, duplicate data may be written.

Real-time write for an entire database

  • Real-time data synchronization tasks support Serverless resource groups (recommended) and old versions of exclusive resource groups for Data Integration.

  • If the source table has a primary key, the primary key value is used as the key for the Kafka record. This ensures that changes to the same primary key are written to the same Kafka partition in order.

  • If the source table does not have a primary key, you have two options. If you select the option to synchronize tables without primary keys, the key for the Kafka record is empty. To ensure that table changes are written to Kafka in order, the destination Kafka topic must have only a single partition. If you select a custom primary key, a combination of one or more non-primary key fields is used as the key for the Kafka record.

  • If the Kafka cluster returns an exception and you need to ensure that changes to the same primary key are written to the same Kafka partition in order, add the following configuration to the extended parameters of the Kafka data source.

    {"max.in.flight.requests.per.connection":1,"buffer.memory": 100554432}

    Important

    This configuration significantly degrades replication performance. You must balance performance with the need for strict ordering and reliability.

  • For more information about the overall format of messages written to Kafka in real-time synchronization, the format of heartbeat messages, and the format of messages corresponding to source data changes, see Appendix: Message format.

Supported field types

Kafka provides unstructured data storage. A Kafka record typically includes a key, value, offset, timestamp, headers, and partition. When DataWorks reads data from or writes data to Kafka, DataWorks processes the data based on the following policies.

Offline data read

When DataWorks reads data from Kafka, it can parse the data in JSON format. The following table describes how each data module is processed.

Kafka record data module

Processed data type

key

Depends on the keyType configuration item in the data synchronization task. For more information about the keyType parameter, see the full parameter description in the appendix.

value

Depends on the valueType configuration item in the data synchronization task. For more information about the valueType parameter, see the full parameter description in the appendix.

offset

Long

timestamp

Long

headers

String

partition

Long

Offline data write

When DataWorks writes data to Kafka, it supports writing data in JSON or text format. The data processing policy varies based on the data synchronization solution. The following tables describe the details.

Important
  • When data is written in text format, field names are not included. Field values are separated by a separator.

  • When data is written to Kafka in a real-time synchronization task, the built-in JSON format is used. The written data includes all information, such as database change messages, business time, and DDL information. For more information about the data format, see Appendix: Message format.

Synchronization task type

Format of the value written to Kafka

Source field type

Processing method for write operations

Offline synchronization

Offline synchronization node in DataStudio

json

String

UTF-8 encoded string

Boolean

Converted to the UTF-8 encoded string "true" or "false"

Time/Date

UTF-8 encoded string in yyyy-MM-dd HH:mm:ss format

Numeric

UTF-8 encoded numeric string

Byte stream

The byte stream is treated as a UTF-8 encoded string and converted to a string.

text

String

UTF-8 encoded string

Boolean

Converted to the UTF-8 encoded string "true" or "false"

Time/Date

UTF-8 encoded string in yyyy-MM-dd HH:mm:ss format

Numeric

UTF-8 encoded numeric string

Byte stream

The byte stream is treated as a UTF-8 encoded string and converted to a string.

Real-time synchronization: Real-time ETL to Kafka

Real-time synchronization node in DataStudio

json

String

UTF-8 encoded string

Boolean

JSON Boolean type

Time/Date

  • For time values with precision less than a millisecond: Converted to a 13-digit JSON integer that represents the timestamp in milliseconds.

  • For time values with microsecond or nanosecond precision: Converted to a JSON floating-point number that includes a 13-digit integer for the millisecond timestamp and a 6-digit decimal for the nanosecond timestamp.

Numeric

JSON numeric type

Byte stream

The byte stream is Base64-encoded and then converted to a UTF-8 encoded string.

text

String

UTF-8 encoded string

Boolean

Converted to the UTF-8 encoded string "true" or "false"

Time/Date

UTF-8 encoded string in yyyy-MM-dd HH:mm:ss format

Numeric

UTF-8 encoded numeric string

Byte stream

The byte stream is Base64-encoded and then converted to a UTF-8 encoded string.

Real-time synchronization: Real-time synchronization of an entire database to Kafka

Real-time synchronization of only incremental data

Built-in JSON format

String

UTF-8 encoded string

Boolean

JSON Boolean type

Time/Date

13-digit millisecond timestamp

Numeric

JSON numeric value

Byte stream

The byte stream is Base64-encoded and then converted to a UTF-8 encoded string.

Synchronization solution: One-click real-time synchronization to Kafka

Full offline synchronization + incremental real-time synchronization

Built-in JSON format

String

UTF-8 encoded string

Boolean

JSON Boolean type

Time/Date

13-digit millisecond timestamp

Numeric

JSON numeric value

Byte stream

The byte stream is Base64-encoded and then converted to a UTF-8 encoded string.

Add a data source

Before you develop a synchronization task in DataWorks, you must add the required data source to DataWorks by following the instructions in Add and manage data sources. You can view the infotips of parameters in the DataWorks console to understand the meanings of the parameters when you add a data source.

Develop a data synchronization task

For information about the entry point for and the procedure of configuring a synchronization task, see the following configuration guides.

Configure an offline synchronization task for a single table

Configure a real-time synchronization task for a single table or an entire database

For more information about the procedure, see Configure a real-time synchronization task in DataStudio.

Configure a full and incremental real-time synchronization task for a single table or an entire database

For more information about the procedure, see Configure a synchronization task in Data Integration.

Authentication configuration

SSL

If you set the Special Authentication Method for a Kafka data source to SSL or SASL_SSL, SSL authentication is enabled for the Kafka cluster. You must upload the client truststore certificate file and enter the truststore passphrase.

  • If the Kafka cluster is an Alibaba Cloud Kafka instance, see SSL certificate algorithm upgrade instructions to download the correct truststore certificate file. The truststore passphrase is KafkaOnsClient.

  • If the Kafka cluster is an EMR instance, see Use SSL encryption for Kafka connections to download the correct truststore certificate file and obtain the truststore passphrase.

  • For a self-managed cluster, you must upload the correct truststore certificate and enter the correct truststore passphrase.

The keystore certificate file, keystore passphrase, and SSL passphrase are required only when bidirectional SSL authentication is enabled for the Kafka cluster. The Kafka cluster server uses this to authenticate the client's identity. Bidirectional SSL authentication is enabled when ssl.client.auth=required is set in the server.properties file of the Kafka cluster. For more information, see Use SSL encryption for Kafka connections.

GSSAPI

If you set Sasl Mechanism to GSSAPI when configuring a Kafka data source, you must upload three authentication files: a JAAS configuration file, a Kerberos configuration file, and a Keytab file. You must also configure the DNS/HOST settings for the exclusive resource group. The following sections describe these files and the required DNS and HOST settings.

Note

For a Serverless resource group, you must configure the host address information using internal DNS resolution. For more information, see Internal DNS resolution (PrivateZone).

  • JAAS configuration file

    The JAAS file must start with KafkaClient, followed by all configuration items enclosed in a pair of braces {}:

    • The first line inside the braces defines the logon component class to use. For different SASL authentication mechanisms, the logon component class is fixed. Each subsequent configuration item is written in the key=value format.

    • All configuration items except the last one must not end with a semicolon.

    • The last configuration item must end with a semicolon. A semicolon must also be added after the closing brace "}".

    If the format requirements are not met, the JAAS configuration file cannot be parsed. The following code shows a typical JAAS configuration file format. Replace the xxx placeholders with your actual information.

    KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="xxx" storeKey=true serviceName="kafka-server" principal="kafka-client@EXAMPLE.COM"; };

    Configuration item

    Description

    Logon module

    Must be set to com.sun.security.auth.module.Krb5LoginModule.

    useKeyTab

    Must be set to true.

    keyTab

    You can specify any path. When the synchronization task runs, the keytab file uploaded during the data source configuration is automatically downloaded to a local path. This local file path is then used to fill in the keyTab configuration item.

    storeKey

    Specifies whether the client saves the key. You can set this to true or false. It does not affect data synchronization.

    serviceName

    Corresponds to the sasl.kerberos.service.name configuration item in the server.properties configuration file of the Kafka server. Configure this item as needed.

    principal

    The Kerberos principal used by the Kafka client. Configure this item as needed and make sure that the uploaded keytab file contains the key for this principal.

  • Kerberos configuration file

    The Kerberos configuration file must contain two modules: [libdefaults] and [realms].

    • The [libdefaults] module specifies Kerberos authentication parameters. Each configuration item in the module is written in the key=value format.

    • The [realms] module specifies the KDC address. It can contain multiple realm submodules. Each realm submodule starts with realm name=.

    This is followed by a set of configuration items enclosed in braces. Each configuration item is also written in the key=value format. The following code shows a typical Kerberos configuration file format. Replace the xxx placeholders with your actual information.

    [libdefaults] default_realm = xxx [realms] xxx = { kdc = xxx }

    Configuration item

    Description

    [libdefaults].default_realm

    The default realm used when accessing Kafka cluster nodes. This is usually the same as the realm of the client principal specified in the JAAS configuration file.

    Other [libdefaults] parameters

    The [libdefaults] module can specify other Kerberos authentication parameters, such as ticket_lifetime. Configure them as needed.

    [realms].realm name

    Must be the same as the realm of the client principal specified in the JAAS configuration file and the [libdefaults].default_realm. If the realm of the client principal in the JAAS configuration file is different from [libdefaults].default_realm, you need to include two realms submodules. These submodules must correspond to the realm of the client principal in the JAAS configuration file and the [libdefaults].default_realm, respectively.

    [realms].realm name.kdc

    Specifies the KDC address and port in the ip:port format. For example, kdc=10.0.0.1:88. If the port is omitted, the default port 88 is used. For example, kdc=10.0.0.1.

  • Keytab file

    The Keytab file must contain the key for the principal specified in the JAAS configuration file and must be verifiable by the KDC. For example, if there is a file named client.keytab in the current working directory, you can run the following command to verify whether the Keytab file contains the key for the specified principal.

    klist -ket ./client.keytab Keytab name: FILE:client.keytab KVNO Timestamp Principal ---- ------------------- ------------------------------------------------------ 7 2018-07-30T10:19:16 te**@**.com (des-cbc-md5)
  • DNS and HOST configuration for an exclusive resource group

    In a Kafka cluster with Kerberos authentication enabled, the hostname of a node in the cluster is used as part of the principal registered for that node in the KDC (Key Distribution Center). When a client accesses a node in the Kafka cluster, it infers the node's principal based on the local DNS and HOST settings to obtain an access credential for the node from the KDC. When you use an exclusive resource group to access a Kafka cluster with Kerberos authentication enabled, you must correctly configure the DNS and HOST settings to ensure that the access credential for the cluster nodes can be obtained from the KDC:

    • DNS settings

      If a PrivateZone instance is used for domain name resolution of the Kafka cluster nodes in the VPC where the exclusive resource group is attached, you can add a custom route for the IP addresses 100.100.2.136 and 100.100.2.138 to the VPC attachment of the exclusive resource group in the DataWorks console. This ensures that the PrivateZone domain name resolution settings for the Kafka cluster nodes apply to the exclusive resource group.

    • HOST settings

      If a PrivateZone instance is not used for domain name resolution of the Kafka cluster nodes in the VPC where the exclusive resource group is attached, you must add the IP address-to-domain name mappings for each Kafka cluster node to the Host configuration in the network settings for the exclusive resource group in the DataWorks console.

PLAIN

When you configure a Kafka data source, if you set Sasl Mechanism to PLAIN, the JAAS file must start with KafkaClient, followed by all configuration items enclosed in a pair of braces {}.

  • The first line inside the braces defines the logon component class to use. For different SASL authentication mechanisms, the logon component class is fixed. Each subsequent configuration item is written in the key=value format.

  • All configuration items except the last one must not end with a semicolon.

  • The last configuration item must end with a semicolon. A semicolon must also be added after the closing brace "}".

If the format requirements are not met, the JAAS configuration file cannot be parsed. The following code shows a typical JAAS configuration file format. Replace the xxx placeholders with your actual information.

KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="xxx" password="xxx"; };

Configuration item

Description

Logon module

Must be set to org.apache.kafka.common.security.plain.PlainLoginModul

username

The username. Configure this item as needed.

password

The password. Configure this item as needed.

FAQ

Appendix: Script demos and parameter descriptions

Configure a batch synchronization task by using the code editor

If you want to configure a batch synchronization task by using the code editor, you must configure the related parameters in the script based on the unified script format requirements. For more information, see Configure a batch synchronization task by using the code editor. The following information describes the parameters that you must configure for data sources when you configure a batch synchronization task by using the code editor.

Reader script demo

The following code shows the JSON configuration for reading data from Kafka.

{ "type": "job", "steps": [ { "stepType": "kafka", "parameter": { "server": "host:9093", "column": [ "__key__", "__value__", "__partition__", "__offset__", "__timestamp__", "'123'", "event_id", "tag.desc" ], "kafkaConfig": { "group.id": "demo_test" }, "topic": "topicName", "keyType": "ByteArray", "valueType": "ByteArray", "beginDateTime": "20190416000000", "endDateTime": "20190416000006", "skipExceedRecord": "true" }, "name": "Reader", "category": "reader" }, { "stepType": "stream", "parameter": { "print": false, "fieldDelimiter": "," }, "name": "Writer", "category": "writer" } ], "version": "2.0", "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] }, "setting": { "errorLimit": { "record": "0" }, "speed": { "throttle": true,//When throttle is false, the mbps parameter does not take effect, which means that the rate is not limited. When throttle is true, the rate is limited. "concurrent": 1,//The number of concurrent threads. "mbps":"12"//The maximum transmission rate. 1 mbps is equal to 1 MB/s. } } }

Reader script parameters

Parameter

Description

Required

datasource

The name of the data source. The code editor supports adding data sources. The value of this parameter must be the same as the name of the added data source.

Yes

server

The address of the Kafka broker server in ip:port format.

You can configure only one server, but you must make sure that DataWorks can connect to the IP addresses of all brokers in the Kafka cluster.

Yes

topic

The Kafka topic. A topic is an aggregation of message feeds that Kafka processes.

Yes

column

The Kafka data to read. Constant columns, data columns, and attribute columns are supported.

  • Constant column: A column enclosed in single quotation marks, such as ["'abc'", "'123'"].

  • Data column

    • If your data is in JSON format, you can get the properties of the JSON object, such as ["event_id"].

    • If your data is in JSON format, you can get the nested sub-properties of the JSON object, such as ["tag.desc"].

  • Attribute column

    • __key__: the key of the message.

    • __value__: the full content of the message.

    • __partition__: the partition where the current message resides.

    • __headers__: the headers of the current message.

    • __offset__: the offset of the current message.

    • __timestamp__: the timestamp of the current message.

    The following code provides a complete example.

    "column": [ "__key__", "__value__", "__partition__", "__offset__", "__timestamp__", "'123'", "event_id", "tag.desc" ]

Yes

keyType

The type of the Kafka key. Valid values: BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, and SHORT.

Yes

valueType

The type of the Kafka value. Valid values: BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, and SHORT.

Yes

beginDateTime

The start offset for data consumption. This is the left boundary of the time range (inclusive). It is a time string in the yyyymmddhhmmss format. You can use it with Scheduling Parameters. For more information, see Supported formats of scheduling parameters.

Note

This feature is supported in Kafka 0.10.2 and later.

You must specify either this parameter or beginOffset.

Note

beginDateTime and endDateTime are used together.

endDateTime

The end offset for data consumption. This is the right boundary of the time range (exclusive). It is a time string in the yyyymmddhhmmss format. You can use it with Scheduling Parameters. For more information, see Supported formats of scheduling parameters.

Note

This feature is supported in Kafka 0.10.2 and later.

You must specify either this parameter or endOffset.

Note

endDateTime and beginDateTime are used together.

beginOffset

The start offset for data consumption. You can configure it in the following forms:

  • A number, such as 15553274, which indicates the starting offset for consumption.

  • seekToBeginning: indicates that data is consumed from the earliest offset.

  • seekToLast: indicates that data is read from the offset saved for the group ID specified by group.id in the kafkaConfig configuration. Note that the group offset is automatically committed to the Kafka server by the client at regular intervals. Therefore, if a task fails and is rerun, data may be duplicated or lost. If the skipExceedRecord parameter is set to true, the task may discard the last few records that were read. The group offset for this discarded data has already been committed to the server, so this data cannot be read in the next task run.

  • seekToEnd: indicates that data is consumed from the latest offset. This will read empty data.

You must specify either this parameter or beginDateTime.

endOffset

The end offset for data consumption. This is used to control when the data consumption task exits.

You must specify either this parameter or endDateTime.

skipExceedRecord

Kafka uses public ConsumerRecords<K, V> poll(final Duration timeout) to consume data. A single poll call may fetch data that is outside the range of endOffset or endDateTime. skipExceedRecord controls whether to write the excess data to the destination. Because data consumption uses automatic offset commits, we recommend the following:

  • For Kafka versions earlier than 0.10.2: Set skipExceedRecord to false.

  • For Kafka 0.10.2 and later: Set skipExceedRecord to true.

No. The default value is false.

partition

A Kafka topic has multiple partitions (partition). By default, a data synchronization task reads data from an offset range that covers all partitions in the topic. You can also specify a partition to read data only from the offset range of a single partition.

No. No default value.

kafkaConfig

When creating a KafkaConsumer client for data consumption, you can specify extended parameters such as bootstrap.servers, auto.commit.interval.ms, and session.timeout.ms. You can use kafkaConfig to control the consumption behavior of KafkaConsumer.

No

encoding

When keyType or valueType is set to STRING, the encoding specified by this parameter is used to parse the string.

No. The default value is UTF-8.

waitTIme

The maximum time, in seconds, that the consumer object waits to pull data from Kafka in a single attempt.

No. The default value is 60.

stopWhenPollEmpty

The valid values are true and false. If this parameter is set to true and the consumer pulls empty data from Kafka (usually because all data in the topic has been read, or due to network or Kafka cluster availability issues), the task stops immediately. Otherwise, it retries until data is read again.

No. The default value is true.

stopWhenReachEndOffset

This parameter takes effect only when stopWhenPollEmpty is true. The valid values are true and false.

  • If this parameter is set to true and the consumer pulls empty data from Kafka, it checks whether the latest data in the Kafka topic partitions has been read. If the latest data from all partitions has been read, the task stops immediately. Otherwise, it continues to try to pull data from the Kafka topic.

  • If this parameter is set to false and the consumer pulls empty data from Kafka, it does not perform the check and stops the task immediately.

No. The default value is false.

Note

This is for compatibility with historical logic. Kafka versions earlier than V0.10.2 cannot check if the latest data from all partitions of a Kafka topic has been read. However, some code editor tasks may be reading data from Kafka versions earlier than V0.10.2.

The following table describes the kafkaConfig parameters.

Parameter

Description

fetch.min.bytes

Specifies the minimum number of bytes of messages that the consumer can get from the broker. The data is returned to the consumer only when there is enough data.

fetch.max.wait.ms

The maximum time to wait for the broker to return data. The default value is 500 milliseconds. Data is returned based on whichever condition is met first: fetch.min.bytes or fetch.max.wait.ms.

max.partition.fetch.bytes

Specifies the maximum number of bytes that the broker can return to the consumer from each partition. The default value is 1 MB.

session.timeout.ms

Specifies the time that the consumer can be disconnected from the server before it stops receiving services. The default value is 30 seconds.

auto.offset.reset

The action that the consumer takes when reading with no offset or an invalid offset (because the consumer has been inactive for a long time and the record with the offset has expired and been deleted). The default value is none, which means the offset is not automatically reset. You can change it to earliest, which means the consumer reads the partition records from the earliest offset.

max.poll.records

The number of messages that can be returned by a single call to the poll method.

key.deserializer

The deserialization method for the message key, such as org.apache.kafka.common.serialization.StringDeserializer.

value.deserializer

The deserialization method for the data value, such as org.apache.kafka.common.serialization.StringDeserializer.

ssl.truststore.location

The path of the SSL root certificate.

ssl.truststore.password

The password for the root certificate store. If you are using Alibaba Cloud Kafka, set this to KafkaOnsClient.

security.protocol

The access protocol. Currently, only the SASL_SSL protocol is supported.

sasl.mechanism

The SASL authentication method. If you are using Alibaba Cloud Kafka, use PLAIN.

java.security.auth.login.config

The path of the SASL authentication file.

Writer script demo

The following code shows the JSON configuration for writing data to Kafka.

{ "type":"job", "version":"2.0",//The version number. "steps":[ { "stepType":"stream", "parameter":{}, "name":"Reader", "category":"reader" }, { "stepType":"Kafka",//The plug-in name. "parameter":{ "server": "ip:9092", //The server address of Kafka. "keyIndex": 0, //The column to be used as the key. Must follow camel case naming conventions, with k in lowercase. "valueIndex": 1, //The column to be used as the value. Currently, you can only select one column from the source data or leave this parameter empty. If empty, all source data is used. //For example, to use the 2nd, 3rd, and 4th columns of an ODPS table as the kafkaValue, create a new ODPS table, clean and integrate the data from the original ODPS table into the new table, and then use the new table for synchronization. "keyType": "Integer", //The type of the Kafka key. "valueType": "Short", //The type of the Kafka value. "topic": "t08", //The Kafka topic. "batchSize": 1024 //The amount of data written to Kafka at a time, in bytes. }, "name":"Writer", "category":"writer" } ], "setting":{ "errorLimit":{ "record":"0"//The number of error records. }, "speed":{ "throttle":true,//When throttle is false, the mbps parameter does not take effect, which means that the rate is not limited. When throttle is true, the rate is limited. "concurrent":1, //The number of concurrent jobs. "mbps":"12"//The maximum transmission rate. 1 mbps is equal to 1 MB/s. } }, "order":{ "hops":[ { "from":"Reader", "to":"Writer" } ] } }

Writer script parameters

Parameter

Description

Required

datasource

The name of the data source. The code editor supports adding data sources. The value of this parameter must be the same as the name of the added data source.

Yes

server

The address of the Kafka server in ip:port format.

Yes

topic

The Kafka topic. It is a category for different message feeds that Kafka processes.

Each message published to a Kafka cluster has a category, which is called a topic. A topic is a collection of a group of messages.

Yes

valueIndex

The column in the Kafka writer that is used as the value. If this is not specified, all columns are concatenated to form the value by default. The separator is specified by fieldDelimiter.

No

writeMode

When valueIndex is not configured, this parameter determines the format for concatenating all columns of the source record to form the value of the Kafka record. Valid values are text and JSON. The default value is text.

  • If set to text, all columns are concatenated using the separator specified by fieldDelimiter.

  • If set to JSON, all columns are concatenated into a JSON string based on the field names specified by the column parameter.

For example, if a source record has three columns with values a, b, and c, and writeMode is set to text and fieldDelimiter is set to #, the value of the Kafka record written is the string a#b#c. If writeMode is set to JSON and column is set to [{"name":"col1"},{"name":"col2"},{"name":"col3"}], the value of the Kafka record written is the string {"col1":"a","col2":"b","col3":"c"}.

If valueIndex is configured, this parameter is invalid.

No

column

The fields in the destination table to which data is to be written, separated by commas. For example: "column": ["id", "name", "age"].

When valueIndex is not configured and writeMode is set to JSON, this parameter defines the field names in the JSON structure for the column values of the source record. For example, "column": [{"name":id","type":"JSON_NUMBER"}, {"name":"name","type":"JSON_STRING"}, {"name":"age","type":"JSON_NUMBER"}].

  • If the number of columns in the source record is greater than the number of field names configured in column, the data is truncated during writing. For example:

    If a source record has three columns with values a, b, and c, and column is configured as [{"name":"col1","type":"JSON_STRING"},{"name":"col2","type":"JSON_STRING"}], the value of the Kafka record written is the string {"col1":"a","col2":"b"}.

  • If the number of columns in the source record is less than the number of field names configured in column, the extra field names in the column configuration are filled with null or the string specified by nullValueFormat. For example:

    If a source record has two columns with values a and b, and column is configured as [{"name":"col1","type":"JSON_STRING"},{"name":"col2","type":"JSON_STRING"},{"name":"col3","type":"JSON_STRING"}], the value of the Kafka record written is the string {"col1":"a","col2":"b","col3":null}. If valueIndex is configured, or if writeMode is set to text, this parameter is invalid.

  • If the JSON field type is not configured, the default field type is JSON_STRING.

  • For the valid values of the JSON field type, see Appendix: JSON field types.

If valueIndex is configured, or if writeMode is set to text, this parameter is invalid.

Required when valueIndex is not configured and writeMode is set to JSON.

partition

Specifies the number of the partition in the Kafka topic to which data is written. This must be an integer greater than or equal to 0.

No

keyIndex

The column in the Kafka writer that is used as the key.

The value of the keyIndex parameter must be an integer greater than or equal to 0. Otherwise, the task will fail.

No

keyIndexes

An array of the ordinal numbers of the columns in the source record that are used as the key for the Kafka record.

The column ordinal number starts from 0. For example, [0,1,2] will concatenate the values of all configured column numbers with commas to form the key of the Kafka record. If this is not specified, the key of the Kafka record is null, and the data is written to the partitions of the topic in a round-robin fashion. You can only specify either this parameter or keyIndex.

No

fieldDelimiter

When writeMode is set to text and valueIndex is not configured, all columns of the source record are concatenated using the column separator specified by this parameter to form the value of the Kafka record. You can configure a single character or multiple characters as the separator. You can configure Unicode characters in the format \u0001. Escape characters such as \t and \n are supported. The default value is \t.

If writeMode is not set to text or if valueIndex is configured, this parameter is invalid.

No

keyType

The type of the Kafka key. Valid values: BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, and SHORT.

Yes

valueType

The type of the Kafka value. Valid values: BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, and SHORT.

Yes

nullKeyFormat

If the value of the source column specified by keyIndex or keyIndexes is null, it is replaced with the string specified by this parameter. If this is not configured, no replacement is made.

No

nullValueFormat

If the value of a source column is null, it is replaced with the string specified by this parameter when assembling the value of the Kafka record. If this is not configured, no replacement is made.

No

acks

The acks configuration when initializing the Kafka producer. This determines the acknowledgment method for successful writes. By default, the acks parameter is set to all. The valid values for acks are:

  • 0: No acknowledgment for successful writes.

  • 1: Acknowledgment for successful write to the primary replica.

  • all: Acknowledgment for successful write to all replicas.

No

Appendix: Definition of message format for writing to Kafka

After you configure and run a real-time synchronization task, the data read from the source database is written to a Kafka topic in JSON format. First, all existing data in the specified source table is written to the corresponding Kafka topic. Then, the task starts real-time synchronization to continuously write incremental data to the topic. Incremental DDL change information from the source table is also written to the Kafka topic in JSON format. You can obtain the status and change information of messages written to Kafka. For more information, see Appendix: Message format.

Note

In the JSON structure of data written to Kafka by an offline synchronization task, the payload.sequenceId, payload.timestamp.eventTime and payload.timestamp.checkpointTime fields are set to -1.

Appendix: JSON field types

When writeMode is set to JSON, you can use the type field in the column parameter to specify the JSON data type. During a write operation, the system attempts to convert the value of the column in the source record to the specified type. If this conversion fails, dirty data is generated.

Valid value

Description

JSON_STRING

Converts the source record column value to a string and writes it to the JSON field. For example, if the source record column value is the integer 123 and column is configured as [{"name":"col1","type":"JSON_STRING"}, the value of the Kafka record written is the string {"col1":"123"}.

JSON_NUMBER

Converts the source record column value to a number and writes it to the JSON field. For example, if the source record column value is the string 1.23 and column is configured as [{"name":"col1","type":"JSON_NUMBER"}, the value of the Kafka record written is the string {"col1":1.23}.

JSON_BOOL

Converts the source record column value to a Boolean value and writes it to the JSON field. For example, if the source record column value is the string true and column is configured as [{"name":"col1","type":"JSON_BOOL"}, the value of the Kafka record written is the string {"col1":true}

JSON_ARRAY

Converts the source record column value to a JSON array and writes it to the JSON field. For example, if the source record column value is the string [1,2,3] and column is configured as [{"name":"col1","type":"JSON_ARRAY"}, the value of the Kafka record written is the string {"col1":[1,2,3]}.

JSON_MAP

Converts the source record column value to a JSON object and writes it to the JSON field. For example, if the source record column value is the string {"k1":"v1"} and column is configured as [{"name":"col1","type":"JSON_MAP"}, the value of the Kafka record written is the string {"col1":{"k1":"v1"}}.

JSON_BASE64

Converts the binary byte content of the source record column value to a BASE64-encoded string and writes it to the JSON field. For example, if the source record column value is a byte array of length 2, represented in hexadecimal as 0x01 0x02, and column is configured as [{"name":"col1","type":"JSON_BASE64"}, the value of the Kafka record written is the string {"col1":"AQI="}.

JSON_HEX

Converts the binary byte content of the source record column value to a hexadecimal numeric string and writes it to the JSON field. For example, if the source record column value is a byte array of length 2, represented in hexadecimal as 0x01 0x02, and column is configured as [{"name":"col1","type":"JSON_HEX"}, the value of the Kafka record written is the string {"col1":"0102"}.