You can use Kafka as a source for single-table real-time synchronization to capture data from a message queue in real time and write the data to a destination. This topic describes how to configure the Kafka input component.
Scope
Supports Alibaba Cloud Kafka and self-managed Kafka versions from 0.10.2 to 2.2.x, inclusive.
Kafka versions earlier than 0.10.2 do not support retrieving partition data offsets, and their data structure may not support timestamps. As a result, latency statistics for sync tasks may be incorrect, and you may be unable to correctly reset the consumer offset.
For more information, see Configure a Kafka data source.
Procedure
Go to the DataStudio page.
Log on to the DataWorks console. In the top navigation bar, select the desired region. In the left-side navigation pane, choose . On the page that appears, select the desired workspace from the drop-down list and click Go to Data Development.
In the Scheduled Workflow pane of the DataStudio page, move the pointer over the
icon and choose .
Alternatively, find the desired workflow in the Scheduled Workflow pane, right-click the workflow name, and then choose
.In the Create Node dialog box, set the Sync Method parameter to End-to-end ETL and configure the Name and Path parameters.
Click Confirm.
On the configuration page of the real-time sync node, click and drag it to the editing panel.
Click the Kafka node. In the Node Configuration dialog box, configure the parameters.
Parameter
Description
Data Source
Select a configured Kafka data source. Only Kafka data sources are supported. If no data source is available, click New Data Source on the right to go to the Configure a Kafka data source.
page and create one. For more information, seeTopic
The name of the Kafka topic. Topics are categories that Kafka uses to organize message feeds.
Each message published to a Kafka cluster belongs to a topic. A topic is a collection of a group of messages.
NoteA Kafka input supports only one topic.
Key Type
The type of the key in the Kafka message. This value determines the key.deserializer configuration when you initialize a KafkaConsumer. Valid values: STRING, BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, and SHORT.
Value Type
The type of the value in the Kafka message. This value determines the value.deserializer configuration when you initialize a KafkaConsumer. Valid values: STRING, BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, and SHORT.
Output Mode
The method used to parse Kafka records.
Single-row output: Parses a Kafka record as an unstructured string or a JSON object. One Kafka record is parsed into one output record.
Multi-row output: Parses a Kafka record as a JSON array. Each array element is parsed into one output record. Therefore, one Kafka record can be parsed into multiple output records.
NoteThis configuration item is supported only in some regions. If you do not see this item, wait for the feature to be released in your region.
Path Of Array
When Output mode is set to Multi-row output, specify the path of the JSON array in the Kafka record value. The path supports referencing a field in a specific JSON object in the
a.a1
format or a field in a specific JSON array in thea[0].a1
format. If you leave this configuration item empty, the entire Kafka record value is parsed as a JSON array.Note that the target JSON array to be parsed must be an object array, such as
[{"a":"hello"},{"b":"world"}]
, not a numeric or string array, such as["a","b"]
.Configuration Parameters
When you create a Kafka data consumption client (KafkaConsumer), you can configure
kafkaConfig
extension parameters for fine-grained control over its data reading behavior. For a complete list of parameters supported by different Kafka cluster versions, see the official Kafka documentation for your version.Common parameter examples:
bootstrap.servers
auto.commit.interval.ms
session.timeout.ms
group.id
Default behavior
By default, a real-time sync task sets a randomly generated string as thegroup.id
for the KafkaConsumer.Manual configuration
You can manually specify a fixedgroup.id
. This lets you monitor or observe the consumer offset of the sync task in the Kafka cluster using the specified consumer group.
Output Fields
Customize the output field names for Kafka data:
Click Add More Fields, enter a Field Name, and select a Type to add a custom field.
The Value Method specifies how to get field values from Kafka records. Click the
icon on the right to switch between the two value methods.
Preset value methods: Provides six preset options to get values from Kafka records:
value: message body
key: message key
partition: partition number
offset: message offset
timestamp: message timestamp in milliseconds
headers: message header
JSON parsing: You can use the . (to get a subfield) and [] (to get an array element) syntax to get content from a complex JSON format. For backward compatibility, you can also use a string that starts with two underscores (_), such as __value__, to get specific content from a Kafka record as the field value. The following code shows sample Kafka data.
{ "a": { "a1": "hello" }, "b": "world", "c":[ "xxxxxxx", "yyyyyyy" ], "d":[ { "AA":"this", "BB":"is_data" }, { "AA":"that", "BB":"is_also_data" } ] }
The value for the output field varies based on the situation:
To sync the Kafka record value, set the value method to __value__.
To sync the Kafka record key, set the value method to __key__.
To sync the Kafka record partition, set the value method to __partition__.
To sync the Kafka record offset, set the value method to __offset__.
To sync the Kafka record timestamp, set the value method to __timestamp__.
To sync the Kafka record headers, set the value method to __headers__.
To sync the data "hello" from the a1 field, set the value method to a.a1.
To sync the data "world from the b field, set the value method to b.
To sync the data "yyyyyyy" from the c field, set the value method to c[1].
To sync the data "this" from the AA field, set the value method to d[0].AA.
Move the pointer over the field you want to remove and click the
icon.
Scenario example: If you set Output Mode to Multi-row Output, the system first parses the JSON array based on the JSON path specified in Path of array. Then, it takes each JSON object from the array and forms the output fields based on the defined field names and value methods. The value method definition is the same as in the single-row output mode. You can use the . (to get a subfield) and [] (to get an array element) syntax to get content from a complex JSON format. The following code shows sample Kafka instance data:
{ "c": { "c0": [ { "AA": "this", "BB": "is_data" }, { "AA": "that", "BB": "is_also_data" } ] } }
If you set Path of array to
c.c0
and define two output fields, one namedAA
with the value methodAA
and another namedBB
with the value methodBB
, this Kafka record is parsed into the following two records:Click the
icon in the toolbar.
NoteA Kafka input supports only one topic.