DEV Community

Robin Moffatt
Robin Moffatt

Posted on • Originally published at rmoff.net on

πŸŽ„ Twelve Days of SMT πŸŽ„ - Day 3: Flatten

The Flatten Single Message Transform (SMT) is useful when you need to collapse a nested message down to a flat structure.

To use the Single Message Transform you only need to reference it; there’s no additional configuration required:

"transforms" : "flatten", "transforms.flatten.type" : "org.apache.kafka.connect.transforms.Flatten$Value" 
Enter fullscreen mode Exit fullscreen mode

You can optionally override the default delimiter (.) that’s used:

"transforms.flatten.delimiter" : "_" 
Enter fullscreen mode Exit fullscreen mode

πŸ‘Ύ Demo code

Example - JDBC Sink connector

See also πŸŽ₯ Kafka Connect in Action : JDBC Sink (πŸ‘Ύ demo code) and πŸŽ₯ ksqlDB & Kafka Connect JDBC Sink in action (πŸ‘Ύ demo code)

Given a source message that looks like this:

{ "FULL_NAME": "Opossum, american virginia", "ADDRESS": { "STREET": "20 Acker Terrace" "CITY": "Lynchburg" "COUNTY_OR_STATE": "Virginia" "ZIP_OR_POSTCODE": "24515" } } 
Enter fullscreen mode Exit fullscreen mode

We can’t load it directly into a database because databases expect flat structures. If we try to load it as it is the JDBC Sink connector will fail and throw an error:

…(STRUCT) type doesn't have a mapping to the SQL database column type 
Enter fullscreen mode Exit fullscreen mode

So we use the Single Message Transform to flatten the source payload:

curl -i -X PUT -H "Accept:application/json" \ -H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day3-customers-00/config \ -d '{ "connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url" : "jdbc:mysql://mysql:3306/demo", "connection.user" : "mysqluser", "connection.password" : "mysqlpw", "topics" : "day3-customers", "tasks.max" : "4", "auto.create" : "true", "auto.evolve" : "true", "transforms" : "flatten", "transforms.flatten.type" : "org.apache.kafka.connect.transforms.Flatten$Value", "transforms.flatten.delimiter" : "_" }' 
Enter fullscreen mode Exit fullscreen mode

This will work, and you can now see the data in MySQL:

mysql> describe `day3-customers`; +-------------------------+------+------+-----+---------+-------+ | Field | Type | Null | Key | Default | Extra | +-------------------------+------+------+-----+---------+-------+ | FULL_NAME | text | YES | | NULL | | | ADDRESS_STREET | text | YES | | NULL | | | ADDRESS_CITY | text | YES | | NULL | | | ADDRESS_COUNTY_OR_STATE | text | YES | | NULL | | | ADDRESS_ZIP_OR_POSTCODE | text | YES | | NULL | | +-------------------------+------+------+-----+---------+-------+ 5 rows in set (0.00 sec) mysql> select * from `day3-customers`; +----------------------------+-----------------------------+--------------+-------------------------+-------------------------+ | FULL_NAME | ADDRESS_STREET | ADDRESS_CITY | ADDRESS_COUNTY_OR_STATE | ADDRESS_ZIP_OR_POSTCODE | +----------------------------+-----------------------------+--------------+-------------------------+-------------------------+ | Opossum, american virginia | 20 Acker Terrace | Lynchburg | Virginia | 24515 | | Red deer | 53 Basil Terrace | Lexington | Kentucky | 40515 | | Laughing kookaburra | 84 Monument Alley | San Jose | California | 95113 | | American bighorn sheep | 326 Sauthoff Crossing | San Antonio | Texas | 78296 | | Skua, long-tailed | 7 Laurel Terrace | Manassas | Virginia | 22111 | | Fox, bat-eared | 2946 Daystar Drive | Jamaica | New York | 11431 | | Greater rhea | 97 Morning Way | Charleston | West Virginia | 25331 | | Vervet monkey | 7615 Brown Park | Chicago | Illinois | 60681 | | White spoonbill | 7 Fulton Parkway | Asheville | North Carolina | 28805 | | Sun gazer | 61 Lakewood Gardens Parkway | Pensacola | Florida | 32590 | +----------------------------+-----------------------------+--------------+-------------------------+-------------------------+ 10 rows in set (0.00 sec) 
Enter fullscreen mode Exit fullscreen mode

Here’s how to add the key into the target table:

curl -i -X PUT -H "Accept:application/json" \ -H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day3-customers-02/config \ -d '{ "connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url" : "jdbc:mysql://mysql:3306/demo", "connection.user" : "mysqluser", "connection.password" : "mysqlpw", "topics" : "day3-customers2", "tasks.max" : "4", "auto.create" : "true", "auto.evolve" : "true", "transforms" : "flatten", "transforms.flatten.type" : "org.apache.kafka.connect.transforms.Flatten$Value", "transforms.flatten.delimiter" : "_", "pk.mode" : "record_key", "pk.fields" : "id", "key.converter" : "org.apache.kafka.connect.converters.LongConverter" }' 
Enter fullscreen mode Exit fullscreen mode

Top comments (0)