- Notifications
You must be signed in to change notification settings - Fork 989
Open
Labels
blockerbugdialectDialect for the JDBC source and/or sinkDialect for the JDBC source and/or sinkmssqlmysql
Description
I can't get numeric.mapping
to work with MySQL and Confluent Platform 5.1. Steps to reproduce below.
Create MySQL table:
use demo; create table transactions ( txn_id INT, customer_id INT, amount DECIMAL(5,2), currency VARCHAR(50), txn_timestamp VARCHAR(50) ); insert into transactions (txn_id, customer_id, amount, currency, txn_timestamp) values (3, 2, 17.13, 'EUR', '2018-04-30T21:30:39Z');
Inspect table:
mysql> describe transactions; +---------------+--------------+------+-----+---------+-------+ | Field | Type | Null | Key | Default | Extra | +---------------+--------------+------+-----+---------+-------+ | txn_id | int(11) | YES | | NULL | | | customer_id | int(11) | YES | | NULL | | | amount | decimal(5,2) | YES | | NULL | | | currency | varchar(50) | YES | | NULL | | | txn_timestamp | varchar(50) | YES | | NULL | | +---------------+--------------+------+-----+---------+-------+ 5 rows in set (0.00 sec)
Create connector
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "jdbc_source_mysql_12a", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:mysql://mysql:3306/demo", "connection.user": "connect_user", "connection.password": "asgard", "topic.prefix": "mysql-12a-", "numeric.mapping": "best_fit", "table.whitelist" : "demo.transactions", "mode":"bulk", "poll.interval.ms" : 3600000 } }'
Even though "numeric.mapping": "best_fit"
, Kafka Connect stores the DECIMAL(5,2)
as a Decimal
, serialised to bytes in Avro:
$ curl -s "http://localhost:8081/subjects/mysql-12a-transactions-value/versions/1"|jq '.schema|fromjson.fields[] | select (.name == "amount")' { "name": "amount", "type": [ "null", { "type": "bytes", "scale": 2, "precision": 64, "connect.version": 1, "connect.parameters": { "scale": "2" }, "connect.name": "org.apache.kafka.connect.data.Decimal", "logicalType": "decimal" } ], "default": null }
Connect Worker log excerpt:
INFO Kafka version : 2.1.0-cp1 (org.apache.kafka.common.utils.AppInfoParser) … INFO JdbcSourceTaskConfig values: batch.max.rows = 100 catalog.pattern = null connection.attempts = 3 connection.backoff.ms = 10000 connection.password = [hidden] connection.url = jdbc:mysql://mysql:3306/demo connection.user = connect_user dialect.name = incrementing.column.name = mode = bulk numeric.mapping = best_fit numeric.precision.mapping = false poll.interval.ms = 3600000 query = schema.pattern = null table.blacklist = [] table.poll.interval.ms = 60000 table.types = [TABLE] table.whitelist = [demo.transactions] tables = [`demo`.`transactions`] timestamp.column.name = [] timestamp.delay.interval.ms = 0 topic.prefix = mysql-12a- validate.non.null = true (io.confluent.connect.jdbc.source.JdbcSourceTaskConfig) … DEBUG Checking for next block of results from BulkTableQuerier{table='"demo"."transactions"', query='null', topicPrefix='mysql-12a-'} (io.confluent.connect.jdbc.source.JdbcSourceTask) DEBUG BulkTableQuerier{table='"demo"."transactions"', query='null', topicPrefix='mysql-12a-'} prepared SQL query: SELECT * FROM `demo`.`transactions` (io.confluent.connect.jdbc.source.BulkTableQuerier) DEBUG DECIMAL with precision: '5' and scale: '2' (io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect) DEBUG DECIMAL with precision: '5' and scale: '2' (io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect) DEBUG Returning 100 records for BulkTableQuerier{table='"demo"."transactions"', query='null', topicPrefix='mysql-12a-'} (io.confluent.connect.jdbc.source.JdbcSourceTask) … kafka-connect_1_8eb73e80dda1 | [2019-01-07 13:37:50,920] DEBUG Sending POST with input {"schema":"{\"type\":\"record\",\"name\":\"transactions\",\"fields\":[{\"name\":\"txn_id\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"customer_id\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"amount\",\"type\":[\"null\",{\"type\":\"bytes\",\"scale\":2,\"precision\":64,\"connect.version\":1,\"connect.parameters\":{\"scale\":\"2\"},\"connect.name\":\"org.apache.kafka.connect.data.Decimal\",\"logicalType\":\"decimal\"}],\"default\":null},{\"name\":\"currency\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"txn_timestamp\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"transactions\"}"} to http://schema-registry:8081/subjects/mysql-12a-transactions-value/versions (io.confluent.kafka.schemaregistry.client.rest.RestService)
I've tried this with three different settings, each still results in the amount
field serialised to bytes in Avro:
"numeric.mapping": "best_fit"
"numeric.mapping": "precision_only"
"numeric.precision.mapping": true
Per docs I am expecting to see the decimal(5,2)
serialised to Avro FLOAT64
(I think - but at least, not bytes
)
kinghuang, rgibaiev, pushpavanthar, Tin-Nguyen, enesyalcin and 4 more
Metadata
Metadata
Assignees
Labels
blockerbugdialectDialect for the JDBC source and/or sinkDialect for the JDBC source and/or sinkmssqlmysql