All Products
Search
Document Center

DataWorks:Configure Kafka input

Last Updated:Sep 28, 2025

You can use Kafka as a source for single-table real-time synchronization to capture data from a message queue in real time and write the data to a destination. This topic describes how to configure the Kafka input component.

Scope

  • Supports Alibaba Cloud Kafka and self-managed Kafka versions from 0.10.2 to 2.2.x, inclusive.

  • Kafka versions earlier than 0.10.2 do not support retrieving partition data offsets, and their data structure may not support timestamps. As a result, latency statistics for sync tasks may be incorrect, and you may be unable to correctly reset the consumer offset.

For more information, see Configure a Kafka data source.

Procedure

  1. Go to the DataStudio page.

    Log on to the DataWorks console. In the top navigation bar, select the desired region. In the left-side navigation pane, choose Data Development and O&M > Data Development. On the page that appears, select the desired workspace from the drop-down list and click Go to Data Development.

  2. In the Scheduled Workflow pane of the DataStudio page, move the pointer over the 新建 icon and choose Create Node > Data Integration > Real-time Synchronization.

    Alternatively, find the desired workflow in the Scheduled Workflow pane, right-click the workflow name, and then choose Create Node > Data Integration > Real-time Synchronization.

  3. In the Create Node dialog box, set the Sync Method parameter to End-to-end ETL and configure the Name and Path parameters.

  4. Click Confirm.

  5. On the configuration page of the real-time sync node, click Input > Kafka and drag it to the editing panel.

  6. Click the Kafka node. In the Node Configuration dialog box, configure the parameters.

    image

    Parameter

    Description

    Data Source

    Select a configured Kafka data source. Only Kafka data sources are supported. If no data source is available, click New Data Source on the right to go to the Workspace Management > Data Source page and create one. For more information, see Configure a Kafka data source.

    Topic

    The name of the Kafka topic. Topics are categories that Kafka uses to organize message feeds.

    Each message published to a Kafka cluster belongs to a topic. A topic is a collection of a group of messages.

    Note

    A Kafka input supports only one topic.

    Key Type

    The type of the key in the Kafka message. This value determines the key.deserializer configuration when you initialize a KafkaConsumer. Valid values: STRING, BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, and SHORT.

    Value Type

    The type of the value in the Kafka message. This value determines the value.deserializer configuration when you initialize a KafkaConsumer. Valid values: STRING, BYTEARRAY, DOUBLE, FLOAT, INTEGER, LONG, and SHORT.

    Output Mode

    The method used to parse Kafka records.

    • Single-row output: Parses a Kafka record as an unstructured string or a JSON object. One Kafka record is parsed into one output record.

    • Multi-row output: Parses a Kafka record as a JSON array. Each array element is parsed into one output record. Therefore, one Kafka record can be parsed into multiple output records.

    Note

    This configuration item is supported only in some regions. If you do not see this item, wait for the feature to be released in your region.

    Path Of Array

    When Output mode is set to Multi-row output, specify the path of the JSON array in the Kafka record value. The path supports referencing a field in a specific JSON object in the a.a1 format or a field in a specific JSON array in the a[0].a1 format. If you leave this configuration item empty, the entire Kafka record value is parsed as a JSON array.

    Note that the target JSON array to be parsed must be an object array, such as [{"a":"hello"},{"b":"world"}], not a numeric or string array, such as ["a","b"].

    Configuration Parameters

    When you create a Kafka data consumption client (KafkaConsumer), you can configure kafkaConfig extension parameters for fine-grained control over its data reading behavior. For a complete list of parameters supported by different Kafka cluster versions, see the official Kafka documentation for your version.

    • Common parameter examples:

      • bootstrap.servers

      • auto.commit.interval.ms

      • session.timeout.ms

    • group.id

      • Default behavior
        By default, a real-time sync task sets a randomly generated string as the group.id for the KafkaConsumer.

      • Manual configuration
        You can manually specify a fixed group.id. This lets you monitor or observe the consumer offset of the sync task in the Kafka cluster using the specified consumer group.

    Output Fields

    Customize the output field names for Kafka data:

    • Click Add More Fields, enter a Field Name, and select a Type to add a custom field.

      The Value Method specifies how to get field values from Kafka records. Click the 箭头 icon on the right to switch between the two value methods.

      • Preset value methods: Provides six preset options to get values from Kafka records:

        • value: message body

        • key: message key

        • partition: partition number

        • offset: message offset

        • timestamp: message timestamp in milliseconds

        • headers: message header

      • JSON parsing: You can use the . (to get a subfield) and [] (to get an array element) syntax to get content from a complex JSON format. For backward compatibility, you can also use a string that starts with two underscores (_), such as __value__, to get specific content from a Kafka record as the field value. The following code shows sample Kafka data.

        { "a": { "a1": "hello" }, "b": "world", "c":[ "xxxxxxx", "yyyyyyy" ], "d":[ { "AA":"this", "BB":"is_data" }, { "AA":"that", "BB":"is_also_data" } ] }
        • The value for the output field varies based on the situation:

          • To sync the Kafka record value, set the value method to __value__.

          • To sync the Kafka record key, set the value method to __key__.

          • To sync the Kafka record partition, set the value method to __partition__.

          • To sync the Kafka record offset, set the value method to __offset__.

          • To sync the Kafka record timestamp, set the value method to __timestamp__.

          • To sync the Kafka record headers, set the value method to __headers__.

          • To sync the data "hello" from the a1 field, set the value method to a.a1.

          • To sync the data "world from the b field, set the value method to b.

          • To sync the data "yyyyyyy" from the c field, set the value method to c[1].

          • To sync the data "this" from the AA field, set the value method to d[0].AA.

    • Move the pointer over the field you want to remove and click the 删除 icon.

    Scenario example: If you set Output Mode to Multi-row Output, the system first parses the JSON array based on the JSON path specified in Path of array. Then, it takes each JSON object from the array and forms the output fields based on the defined field names and value methods. The value method definition is the same as in the single-row output mode. You can use the . (to get a subfield) and [] (to get an array element) syntax to get content from a complex JSON format. The following code shows sample Kafka instance data:

    { "c": { "c0": [ { "AA": "this", "BB": "is_data" }, { "AA": "that", "BB": "is_also_data" } ] } }

    If you set Path of array to c.c0 and define two output fields, one named AA with the value method AA and another named BB with the value method BB, this Kafka record is parsed into the following two records: 记录

  7. Click the 保存 icon in the toolbar.

    Note

    A Kafka input supports only one topic.