Share this
Replicating MySQL to Snowflake with Kafka and Debezium—Part Two: Data Ingestion
by Jose Rodriguez on May 17, 2021 12:00:00 AM
Here we go again
Hello, and welcome to this second part of my “Replicating MySQL to Snowflake” series. If you landed here from a web search and missed part one, you can take a look here: part one.
What’s up?
In this second part, I’ll be demonstrating how to ingest data from Kafka into Snowflake using the Snowflake Connector for Kafka.
In part one I showed the diagram for this architecture and how to implement the first half of it.
 
A reminder on the environment:
- OS: Ubuntu 20.04.2 LTS
- MySQL: Ver 8.0.24 for Linux on x86_64 (MySQL Community Server—GPL)
- ZooKeeper: Apache ZooKeeper, version 3.7.0 2021-03-17 09:46 UTC
- Kafka: 2.8.0
- Scala (included with Kafka): 2.8.0
- Debezium: 2.13 final
- Snowflake Kafka connector (OSS version): 1.5.2 (Maven)
- Snowflake: Enterprise edition (AWS)
This time I’ll be showing the second piece of the puzzle which includes installing and configuring the Snowflake Connector for Kafka and the creation of the Snowflake pieces to ingest the data.
Snowflake target database
For this POC (proof of concept), I used my AWS-hosted trial account that includes one month of access and 400USD in credits. I’ve chosen the Enterprise Edition, but a Standard Edition should be good enough for a similar POC.
I created a dedicated database and schema along with a warehouse. This allows me to easily clean up once I’m done. These come with a newly created role and user dedicated solely for data ingestion purposes.
Having dedicated resources for particular activities is a best practice not only for Snowflake but also for any other database you may be working with. It makes things a lot easier to maintain and audit.
I followed the instructions provided by Snowflake to determine the bare minimum grants this new role requires. As you probably know, this is another best practice.
And here’s the script:
use role sysadmin; create warehouse if not exists wh_ingest warehouse_size = xsmall; create database if not exists mysql_ingest; create schema if not exists mysql_ingest.landing; use role securityadmin; create role if not exists r_mysql_rep; grant all on database mysql_ingest to role r_mysql_rep; grant all on schema mysql_ingest.landing to role r_mysql_rep; grant all on warehouse wh_ingest to role r_mysql_rep; create user if not exists mysql_rep identified by 'XXXXXXXXXXXXXXXXX'; grant role r_mysql_rep to user mysql_rep; grant role r_mysql_rep to role accountadmin; grant role r_mysql_rep to role sysadmin; alter user mysql_rep set DEFAULT_WAREHOUSE=wh_ingest; alter user mysql_rep set DEFAULT_NAMESPACE=mysql_ingest.landing; alter user mysql_rep set default_role=r_mysql_rep;
That’s it—nothing else is required.
You’ll notice that I haven’t created any tables. While there’s an option to point the connector to write into an existing table I kept to my simplicity motto for this POC and let the connector create the tables for me.
I’ll review the landing table structure later on.
Installing the Snowflake Connector for Kafka
I followed the instructions provided in the Snowflake documentation.
The connector requires Kafka and a JDK (Java Development Kit)—Standard Edition is good enough. These two elements are already installed for the CDC (change data capture) part of the process (see part one of the series) so I won’t install them again.
This POC will fully run on a single virtual machine but a real production scenario may require a different configuration to deal with the higher amount of data to process.
The connector itself is just a JAR (Java archive) file available in the Maven repository: https://mvnrepository.com/artifact/com.snowflake/snowflake-kafka-connector.
Simply download both the JAR and its corresponding MD5 files:
jose@localhost:~$ wget https://repo1.maven.org/maven2/com/snowflake/snowflake-kafka-connector/1.5.2/snowflake-kafka-connector-1.5.2.jar jose@localhost:~$ wget https://repo1.maven.org/maven2/com/snowflake/snowflake-kafka-connector/1.5.2/snowflake-kafka-connector-1.5.2.jar.md5
Verify that the MD5 sum of the downloaded file matches with the reported MD5 sum stored in the Maven repository.
I’ll be using an encrypted private key for authentication, so I need the Bouncy Castle plugin, available in Maven as well.
jose@localhost:~$ wget https://repo1.maven.org/maven2/org/bouncycastle/bc-fips/1.0.1/bc-fips-1.0.1.jar jose@localhost:~$ wget https://repo1.maven.org/maven2/org/bouncycastle/bc-fips/1.0.1/bc-fips-1.0.1.jar.md5 jose@localhost:~$ wget https://repo1.maven.org/maven2/org/bouncycastle/bcpkix-fips/1.0.3/bcpkix-fips-1.0.3.jar jose@localhost:~$ wget https://repo1.maven.org/maven2/org/bouncycastle/bcpkix-fips/1.0.3/bcpkix-fips-1.0.3.jar.md5
Again, don’t forget to check the MD5 sum values. This is your data potentially going out into the wild, wild Internet and you want the maximum protection possible, which starts with installing untainted software.
Note: Debezium generates JSON (JavaScript Object Notation) data in the Kafka topic so I’m skipping everything Avro-related in the documentation.
All these files must exist in the <kafka_dir>/libs folder so make sure you copy them into there.
The following is from the documentation and it’s important for a production system with a multinode Kafka cluster: “The Kafka Connect framework broadcasts the configuration settings for the Kafka connector from the master node to worker nodes. The configuration settings include sensitive information (specifically, the Snowflake username and private key). Make sure to secure the communication channel between Kafka Connect nodes. For instructions, see the documentation for your Apache Kafka software.”
The connector uses key pair authentication to connect to Snowflake so I’m going to need a 2048-bit (minimum) RSA key pair.
I created an encrypted private key file with the following command:
jose@localhost:~$ openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 aes256 -inform PEM -out rsa_key.p8 Generating RSA private key, 2048 bit long modulus (2 primes) .............................................................................+++++ ......................+++++ e is 65537 (0x010001) Enter Encryption Password: Verifying - Enter Encryption Password:
Remove the header, footer and line breaks from the key to use it in the configuration file (snowflake-connector-animals.properties):
jose@localhost:~$ grep -v PRIVATE rsa_key.p8 | sed ':a;N;$!ba;s/\n/ /g'
Generate the public key out of the private one with the following command:
jose@localhost:~$ openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub Enter pass phrase for rsa_key.p8: writing RSA key
As before, trim the public key file to use it to enable our Snowflake user to connect using a private key pair like so:
jose@localhost:~$ grep -v PUBLIC rsa_key.pub | sed ':a;N;$!ba;s/\n//g'
Connect to your Snowflake database and execute the following SQL using the result of the previous command:
alter user mysql_rep set rsa_public_key='---REDACTED---'
This is a POC but these files still grant access to a live Snowflake account so make sure you secure them with proper permissions—typically 600 in Linux—and in a separate folder. Especially if you use Git or similar, don’t include these files in the repository. The Snowflake documentation suggests the use of an externalized secret store like AWS Key Management Service (KMS), Microsoft Azure Key Vault or HashiCorp Vault, which sounds like a very good idea for a production environment, if you ask me.
This is how the user looks in Snowflake:
Connector configuration
A connector configuration file specifies the source tables and corresponding Kafka topics. These tables must reside in the same database and schema in the source system. So, I created a single file for my single replicated table.
The file is stored in the Kafka config directory as snowflake-connector-animals.properties.
name=mysql_animals connector.class=com.snowflake.kafka.connector.SnowflakeSinkConnector tasks.max=2 topics=snowflake_source.snowflake_source.animals buffer.count.records=10000 buffer.flush.time=60 buffer.size.bytes=5000000 snowflake.url.name=https://XXXXXX.ca-central-1.snowflakecomputing.com snowflake.user.name=mysql_rep snowflake.private.key=---REDACTED--- snowflake.private.key.passphrase=---REDACTED--- snowflake.database.name=mysql_ingest snowflake.schema.name=landing key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=com.snowflake.kafka.connector.records.SnowflakeJsonConverter
Warning: Although the documentation clearly states that the HTTPS:// and port number are optional in the snowflake.url.name parameter, this doesn’t seem to be the case. I learned this the hard way after a long hour trying to figure out why my configuration was wrong. So, use the full account URL in the configuration file.
There’s also another caveat with Snowflake account URLs. Depending on the cloud provider you’ve chosen, it may or may not include a cloud “identifier” in the url. For example the trial account URL I’m using for this POC includes “aws” as it is in my browser.
https://XXXXX.ca-central-1.aws.snowflakecomputing.com
This URL returns an error when I remove the “aws” part and try to access my account with it. But—and this is a big but—the URL I have to use to get the connector to work is the one without that piece:
https://XXXXX.ca-central-1.snowflakecomputing.com:443
Yes, exactly, it doesn’t include the “aws” portion. In a different POC I was using a GCP-hosted account and I hit the issue the other way round. My program wouldn’t connect unless I added the “gcp” piece to the URL.
This was quite confusing and a source of long troubleshooting hours for initial setups, at least for me.
For future reference, if you find this error message in the Kafka Connector, just start fiddling with the URL until you make it right. Sorry, I can’t be of more help than that.
Caused by: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid and contains the following 3 error(s): snowflake.url.name: Cannot connect to Snowflake snowflake.user.name: Cannot connect to Snowflake snowflake.private.key: Cannot connect to Snowflake
| DISCLAIMER: While the issue above was present during my testing, at the moment of finishing this post it was no longer present and the correct URL is the one including the “aws” part. Snowflake deploys new versions every Friday so it may be the case that they changed something that has now made this behavior consistent. I decided to leave the information in the post in case a similar issue arose somewhere else. | 
While I’m using a single Kafka installation for both the Debezium (CDC) and the Snowflake connectors I need different configuration files to avoid port collisions.
So I created a standalone connector configuration file connect-standalone-write.properties as a copy of connect-standalone.properties adding a custom rest.port of 8084. This isn’t a port we’ll be using in this POC but I had to change it anyway.
jose@localhost:~$ grep -v ^# ./kafka_2.13-2.8.0/config/connect-standalone-write.properties bootstrap.servers=localhost:9092 rest.port=8084 key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true offset.storage.file.filename=/tmp/connect_write.offsets offset.flush.interval.ms=10000 plugin.path=/Pythian/Pythian-internal/SnowFlake_dev/MySQL-SF-replication/kafka-plugins topic.creation.enable=true
As I explained in part one, all the connectors brought up in a single Kafka deployment will share the same log file, making it very difficult to troubleshoot any issues.
To avoid this I use the following command to direct the collector output to a log file defined by me:
jose@localhost:~$ nohup ./kafka_2.13-2.8.0/bin/connect-standalone.sh ./kafka_2.13-2.8.0/config/connect-standalone-write.properties ./kafka_2.13-2.8.0/config/snowflake-connector-animals.properties > snowflake_connector_`date "+%F_%H-%M"`.log 2>&1 &
And this is the final output expected:
[2021-04-28 13:51:07,698] INFO Started o.e.j.s.ServletContextHandler@30ec7d21{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:916) [2021-04-28 13:51:07,699] INFO REST resources initialized; server is started and ready to handle requests (org.apache.kafka.connect.runtime.rest.RestServer:319) [2021-04-28 13:51:07,699] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:57)  Showing off (AKA testing)
After all this is set up it’s time to demonstrate how the end-to-end replication works.
In part one I showed what a Kafka message looks like when the CDC starts up for the first time and Debezium collects a snapshot of the tables to be replicated.
Beware of this behavior in heavily used production systems as the overall performance of the system may be impacted.
I have the following list of commands to start the replication end to end. I don’t call this a script because it can’t be executed as-is to start the whole system due to the time it takes for Kafka to be up and running. So, if you run it and hit some weird error messages, just give Kafka a few minutes to finish starting up, verify that it is up and running then start the collectors.
#start replication pieces in order # Start zookeeper sudo ./apache-zookeeper-3.7.0-bin/bin/zkServer.sh start # Test it is up and running # apache-zookeeper-3.7.0-bin/bin/zkCli.sh -server 127.0.0.1:2181 # Start Kafka broker ./kafka_2.13-2.8.0/bin/kafka-server-start.sh -daemon ./kafka_2.13-2.8.0/config/server.properties # List topics as validation # kafka_2.13-2.8.0/bin/kafka-topics.sh --list --zookeeper localhost:2181 # Start Debezium MySQL connector - Not using the -daemon option to get different log files. nohup ./kafka_2.13-2.8.0/bin/connect-standalone.sh ./kafka_2.13-2.8.0/config/connect-standalone.properties ./kafka_2.13-2.8.0/config/mysql-debezium.properties > debezium_connector_`date "+%F_%H-%M"`.log 2>&1 & # Start Snowflake Kafka connector nohup ./kafka_2.13-2.8.0/bin/connect-standalone.sh ./kafka_2.13-2.8.0/config/connect-standalone-write.properties ./kafka_2.13-2.8.0/config/snowflake-connector-animals.properties > snowflake_connector_`date "+%F_%H-%M"`.log 2>&1 & # View Kafka topic contents # ./kafka_2.13-2.8.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic snowflake_source.snowflake_source.animals
Once everything’s running, we can see the following messages in the Debezium collector log file:
[2021-05-03 10:43:33,423] INFO WorkerSourceTask{id=mysql-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487)  They basically say that there’s no work to be done.
There will be references to the initial snapshot being made if a new table has been added to the replication.
In the Snowflake Connector we can see how the new connector is created, along with the pipes, stages and landing tables corresponding to each of the source tables:
(...) [2021-05-03 10:02:07,225] INFO Finished creating connector mysql_animals (org.apache.kafka.connect.runtime.Worker:310) (...) [2021-05-03 10:02:09,098] INFO [SF_KAFKA_CONNECTOR] initialized the pipe connector for pipe mysql_ingest.landing.SNOWFLAKE_KAFKA_CONNECTOR_mysql_animals_PIPE_snowflake_source_snowflake_source_puppies_741515570_0 (com.snowflake.kafka.conne [2021-05-03 10:02:09,100] INFO (...) [2021-05-03 10:02:10,599] INFO [SF_KAFKA_CONNECTOR] Creating new stage SNOWFLAKE_KAFKA_CONNECTOR_mysql_animals_STAGE_snowflake_source_snowflake_source_puppies_741515570. (com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1:79) (...) [2021-05-03 10:02:11,915] INFO [SF_KAFKA_CONNECTOR] Creating new table snowflake_source_snowflake_source_animals_106896695. (com.snowflake.kafka.connector.internal.SnowflakeSinkServiceV1:79)
And the actual objects as seen from Snowflake. The funny names are automatically generated by the connector.
mysql_rep#WH_INGEST@MYSQL_INGEST.LANDING>show PIPES; +-------------------------------+----------------------------------------------------------------------------------------------------+---------------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+----------------------+---------+-------------+---------+ | created_on | name | database_name | schema_name | definition | owner | notification_channel | comment | integration | pattern | |-------------------------------+----------------------------------------------------------------------------------------------------+---------------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+----------------------+---------+-------------+---------| | 2021-05-03 01:02:13.943 -0700 | SNOWFLAKE_KAFKA_CONNECTOR_MYSQL_ANIMALS_PIPE_SNOWFLAKE_SOURCE_SNOWFLAKE_SOURCE_ANIMALS_106896695_0 | MYSQL_INGEST | LANDING | copy into snowflake_source_snowflake_source_animals_106896695(RECORD_METADATA, RECORD_CONTENT) from (select $1:meta, $1:content from @SNOWFLAKE_KAFKA_CONNECTOR_mysql_animals_STAGE_snowflake_source_snowflake_source_animals_106896695 t) file_format = (type = 'json') | R_MYSQL_REP | NULL | | NULL | NULL | +-------------------------------+----------------------------------------------------------------------------------------------------+---------------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+----------------------+---------+-------------+---------+ 1 Row(s) produced. Time Elapsed: 1.037s mysql_rep#WH_INGEST@MYSQL_INGEST.LANDING>show STAGES; +-------------------------------+---------------------------------------------------------------------------------------------------+---------------+-------------+-----+-----------------+--------------------+-------------+---------+--------+----------+-------+----------------------+---------------------+ | created_on | name | database_name | schema_name | url | has_credentials | has_encryption_key | owner | comment | region | type | cloud | notification_channel | storage_integration | |-------------------------------+---------------------------------------------------------------------------------------------------+---------------+-------------+-----+-----------------+--------------------+-------------+---------+--------+----------+-------+----------------------+---------------------| | 2021-05-03 01:02:13.413 -0700 | SNOWFLAKE_KAFKA_CONNECTOR_MYSQL_ANIMALS_STAGE_SNOWFLAKE_SOURCE_SNOWFLAKE_SOURCE_ANIMALS_106896695 | MYSQL_INGEST | LANDING | | N | N | R_MYSQL_REP | | NULL | INTERNAL | NULL | NULL | NULL | +-------------------------------+---------------------------------------------------------------------------------------------------+---------------+-------------+-----+-----------------+--------------------+-------------+---------+--------+----------+-------+----------------------+---------------------+ 1 Row(s) produced. Time Elapsed: 0.890s mysql_rep#WH_INGEST@MYSQL_INGEST.LANDING>show TABLES; +-------------------------------+-----------------------------------------------------+---------------+-------------+-------+---------+------------+------+-------+-------------+----------------+----------------------+-----------------+---------------------+------------------------------+---------------------------+-------------+ | created_on | name | database_name | schema_name | kind | comment | cluster_by | rows | bytes | owner | retention_time | automatic_clustering | change_tracking | search_optimization | search_optimization_progress | search_optimization_bytes | is_external | |-------------------------------+-----------------------------------------------------+---------------+-------------+-------+---------+------------+------+-------+-------------+----------------+----------------------+-----------------+---------------------+------------------------------+---------------------------+-------------| | 2021-05-03 01:02:12.769 -0700 | SNOWFLAKE_SOURCE_SNOWFLAKE_SOURCE_ANIMALS_106896695 | MYSQL_INGEST | LANDING | TABLE | | | 10 | 34816 | R_MYSQL_REP | 1 | OFF | OFF | OFF | NULL | NULL | N | +-------------------------------+-----------------------------------------------------+---------------+-------------+-------+---------+------------+------+-------+-------------+----------------+----------------------+-----------------+---------------------+------------------------------+---------------------------+-------------+ 1 Row(s) produced. Time Elapsed: 1.136s
Now here comes the real thing.
I start with a table with a few rows in it:
jose@localhost:[snowflake_source]> select * from animals; +----+---------+ | id | name | +----+---------+ | 1 | dog | | 2 | cat | | 3 | penguin | | 4 | lax | | 5 | whale | | 6 | ostrich | | 7 | newt | | 8 | snake | | 9 | frog | | 10 | dragon | +----+---------+ 10 rows in set (0,00 sec) jose@localhost:[snowflake_source]> insert into animals (name) values ('lizard'); Query OK, 1 row affected (0,00 sec) jose@localhost:[snowflake_source]> select * from animals; +----+---------+ | id | name | +----+---------+ | 1 | dog | | 2 | cat | | 3 | penguin | | 4 | lax | | 5 | whale | | 6 | ostrich | | 7 | newt | | 8 | snake | | 9 | frog | | 10 | dragon | | 11 | lizard | +----+---------+ 11 rows in set (0,00 sec)  Which is immediately captured by the MySQL connector as shown in the log:
[2021-05-03 10:58:36,158] INFO 1 records sent during previous 00:57:03.353, last recorded offset: {transaction_id=null, ts_sec=1620032315, file=binlog.000058, pos=235, row=1, server_id=1, event=2} (io.debezium.connector.common.BaseSourceTask:182)  And sent over to Snowflake by the writer:
[2021-05-03 10:58:43,367] INFO Created Insert Request : https://XXXXXX.west-us-2.azure.snowflakecomputing.com:443/v1/data/pipes/mysql_ingest.landing.SNOWFLAKE_KAFKA_CONNECTOR_mysql_animals_PIPE_snowflake_source_snowflake_source_animals_106896695_0/insertFiles?requestId=0f10041c-817c-4cd1-a6ce-bc1ef1b609ec&showSkippedFiles=false (net.snowflake.ingest.connection.RequestBuilder:471)
The Kafka message is JSON and it contains two main parts: SCHEMA and PAYLOAD. Note the PAYLOAD section does not include data in the BEFORE entry. An update will include this information for further processing once in Snowflake.
{ "schema": { "type": "struct", "fields": [ { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "name" } ], "optional": true, "name": "snowflake_source.snowflake_source.animals.Value", "field": "before" }, { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "name" } ], "optional": true, "name": "snowflake_source.snowflake_source.animals.Value", "field": "after" }, { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "string", "optional": true, "name": "io.debezium.data.Enum", "version": 1, "parameters": { "allowed": "true,last,false" }, "default": "false", "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": true, "field": "sequence" }, { "type": "string", "optional": true, "field": "table" }, { "type": "int64", "optional": false, "field": "server_id" }, { "type": "string", "optional": true, "field": "gtid" }, { "type": "string", "optional": false, "field": "file" }, { "type": "int64", "optional": false, "field": "pos" }, { "type": "int32", "optional": false, "field": "row" }, { "type": "int64", "optional": true, "field": "thread" }, { "type": "string", "optional": true, "field": "query" } ], "optional": false, "name": "io.debezium.connector.mysql.Source", "field": "source" }, { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" }, { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "id" }, { "type": "int64", "optional": false, "field": "total_order" }, { "type": "int64", "optional": false, "field": "data_collection_order" } ], "optional": true, "field": "transaction" } ], "optional": false, "name": "snowflake_source.snowflake_source.animals.Envelope" }, "payload": { "before": null, "after": { "id": 11, "name": "lizard" }, "source": { "version": "1.5.0.Final", "connector": "mysql", "name": "snowflake_source", "ts_ms": 1620032315000, "snapshot": "false", "db": "snowflake_source", "sequence": null, "table": "animals", "server_id": 1, "gtid": null, "file": "binlog.000058", "pos": 395, "row": 0, "thread": null, "query": null }, "op": "c", "ts_ms": 1620032315737, "transaction": null } }  These two parts are inserted as two different columns in the target table:
mysql_rep#WH_INGEST@MYSQL_INGEST.LANDING>desc TABLE SNOWFLAKE_SOURCE_SNOWFLAKE_SOURCE_ANIMALS_106896695; +-----------------+---------+--------+-------+---------+-------------+------------+-------+------------+---------+-------------+ | name | type | kind | null? | default | primary key | unique key | check | expression | comment | policy name | |-----------------+---------+--------+-------+---------+-------------+------------+-------+------------+---------+-------------| | RECORD_METADATA | VARIANT | COLUMN | Y | NULL | N | N | NULL | NULL | NULL | NULL | | RECORD_CONTENT | VARIANT | COLUMN | Y | NULL | N | N | NULL | NULL | NULL | NULL | +-----------------+---------+--------+-------+---------+-------------+------------+-------+------------+---------+-------------+ 2 Row(s) produced. Time Elapsed: 1.111s
As you can see, the data type for both columns is VARIANT as expected when inserting JSON data into Snowflake.
This data type requires some fancy SQL to be used to extract relevant information. A very basic query using the FLATTEN function follows:
select f.path,f.value from "MYSQL_INGEST"."LANDING"."SNOWFLAKE_SOURCE_SNOWFLAKE_SOURCE_ANIMALS_106896695" p, lateral flatten(input => p.RECORD_CONTENT:payload, recursive => true) f where f.seq=2
Which returns, in my case, the following information:
| PATH | VALUE | 
| after | { “id”: 11, “name”: “lizard” } | 
| after.id | 11 | 
| after.name | lizard | 
| before | null | 
| op | r | 
| source | { “connector”: “mysql”, “db”: “snowflake_source”, “file”: “binlog.000058”, … | 
| source.connector | mysql | 
| source.db | snowflake_source | 
| source.file | binlog.000058 | 
| source.gtid | null | 
| source.name | snowflake_source | 
| source.pos | 156 | 
| source.query | null | 
| source.row | 0 | 
| source.sequence | null | 
| source.server_id | 0 | 
| source.snapshot | true | 
| source.table | animals | 
| source.thread | null | 
| source.ts_ms | 1620028850302 | 
| source.version | 1.5.0.Final | 
| transaction | null | 
| ts_ms | 1620028850310 | 
And that’s it
Well, not really. I haven’t covered DDL (data definition language) which isn’t currently supported out of the box. Handling DDL isn’t trivial and different approaches have been proposed.
I also haven’t provided any ideas on how to leverage the ingested data, but a very simple way would be to create a pipe triggered on new CDC data arriving into the landing table and populating a slowly changing dimension like a table. Should you need such help don’t hesitate to reach out to our sales team.
Finally, if you read all the way to the end, thank you and don’t forget to comment if you find any issues or something is missing or wrong.
Enjoy!
Share this
- Technical Track (853)
- Oracle (485)
- Database (184)
- MySQL (139)
- Cloud (128)
- Microsoft SQL Server (119)
- Open Source (87)
- Google Cloud (81)
- Microsoft Azure (66)
- Amazon Web Services (AWS) (59)
- Big Data (51)
- Google Cloud Platform (45)
- Cassandra (42)
- DevOps (38)
- Linux (28)
- Pythian (27)
- Podcasts (25)
- PostgreSQL (24)
- Site Reliability Engineering (23)
- Oracle E-Business Suite (22)
- Performance (22)
- Docker (20)
- Security (19)
- DBA (18)
- Oracle Cloud Infrastructure (OCI) (18)
- MongoDB (17)
- Hadoop (16)
- Amazon RDS (15)
- Automation (15)
- BigQuery (15)
- Ansible (14)
- Exadata (14)
- Oracle Database (14)
- Oracleebs (14)
- Snowflake (14)
- Artificial Intelligence (AI) (13)
- Oracle Exadata (13)
- Replication (13)
- ASM (12)
- Data (12)
- GenAI (12)
- Kubernetes (12)
- LLM (12)
- Advanced Analytics (11)
- Machine Learning (11)
- Cloud Migration (10)
- Datascape Podcast (10)
- Oracle Applications (10)
- Apache Cassandra (9)
- Authentication, SSO and MFA (9)
- ChatGPT (8)
- High Availability (8)
- Infrastructure (8)
- Monitoring (8)
- Oracle EBS (8)
- Percona (8)
- Rman (8)
- Data Governance (7)
- Innodb (7)
- Microsoft Azure SQL Database (7)
- Migration (7)
- Myrocks (7)
- Python (7)
- Series (7)
- AWR (6)
- Analytics (6)
- Apache Beam (6)
- Data Enablement (6)
- Data Guard (6)
- Oracle Enterprise Manager (OEM) (6)
- Orchestrator (6)
- Performance Tuning (6)
- RocksDB (6)
- Serverless (6)
- Airflow (5)
- Azure Synapse Analytics (5)
- Covid-19 (5)
- Data Visualization (5)
- Disaster Recovery (5)
- Generative AI (5)
- Mariadb (5)
- Microsoft (5)
- SAP (5)
- Scala (5)
- Xtrabackup (5)
- Cloud Security (4)
- Cloud Spanner (4)
- CockroachDB (4)
- Data Management (4)
- Data Pipeline (4)
- Data Security (4)
- Data Strategy (4)
- Database Administrator (4)
- Database Migration (4)
- Database Performance (4)
- Databases (4)
- Dataflow (4)
- Google (4)
- Google BigQuery (4)
- Oracle Autonomous Database (Adb) (4)
- Oracle Cloud (4)
- Oracle Enterprise Manager (4)
- Prometheus (4)
- Redhat (4)
- Ssl (4)
- Windows (4)
- Amazon Relational Database Service (Rds) (3)
- Apache Kafka (3)
- Apex (3)
- Aurora (3)
- Cloud Armor (3)
- Cloud Database (3)
- Cloud FinOps (3)
- Cosmos Db (3)
- Data Analytics (3)
- Database Management (3)
- Database Monitoring (3)
- Database Troubleshooting (3)
- Digital Transformation (3)
- ERP (3)
- Google Chrome (3)
- Google Cloud Sql (3)
- Google Workspace (3)
- Heterogeneous Database Migration (3)
- Oracle Live Sql (3)
- Oracle Rac (3)
- Perl (3)
- Power Bi (3)
- Remote Teams (3)
- Slob (3)
- Tensorflow (3)
- Terraform (3)
- Amazon Data Migration Service (2)
- Amazon Ec2 (2)
- Anisble (2)
- Apache (2)
- Apache Flink (2)
- Apexexport (2)
- Ashdump (2)
- Azure Data Factory (2)
- Business Intelligence (2)
- Cloud Data Fusion (2)
- Cloud Hosting (2)
- Cloud Infrastructure (2)
- Cloud Shell (2)
- Cloud Sql (2)
- Conferences (2)
- Cosmosdb (2)
- Cost Management (2)
- Data Discovery (2)
- Data Integration (2)
- Data Migration (2)
- Data Quality (2)
- Data Streaming (2)
- Data Warehouse (2)
- Database Upgrade (2)
- Dataguard (2)
- Dataops (2)
- Enterprise Data Platform (EDP) (2)
- Events (2)
- Fusion Middleware (2)
- Gemini (2)
- Graphite (2)
- Infrastructure As Code (2)
- Innodb Cluster (2)
- Innodb File Structure (2)
- Innodb Group Replication (2)
- Liquibase (2)
- NLP (2)
- Neo4J (2)
- Nosql (2)
- Open Source Database (2)
- Oracle Data Guard (2)
- Oracle Datase (2)
- Oracle Flashback (2)
- Oracle Forms (2)
- Oracle Installation (2)
- Oracle Io Testing (2)
- Podcast (2)
- Rdbms (2)
- Redshift (2)
- Remote DBA (2)
- Remote Sre (2)
- S3 (2)
- SAP HANA Cloud (2)
- Single Sign-On (2)
- Webinars (2)
- X5 (2)
- AI (1)
- Actifio (1)
- Adop (1)
- Advanced Data Services (1)
- Afd (1)
- Alloydb (1)
- Amazon (1)
- Amazon Aurora Backtrack (1)
- Amazon Efs (1)
- Amazon Redshift (1)
- Amazon S3 (1)
- Amazon Sagemaker (1)
- Amazon Vpc Flow Logs (1)
- Analysis (1)
- Analytical Models (1)
- Anthos (1)
- Apache Nifi (1)
- Application Migration (1)
- Ash (1)
- Asmlib (1)
- Atlas CLI (1)
- Atp (1)
- Autonomous (1)
- Awr Data Mining (1)
- Awr Mining (1)
- Azure Data Lake (1)
- Azure Data Lake Analytics (1)
- Azure Data Lake Store (1)
- Azure Data Migration Service (1)
- Azure OpenAI (1)
- Azure Sql Data Warehouse (1)
- Batches In Cassandra (1)
- Business Insights (1)
- Chown (1)
- Chrome Security (1)
- Cloud Browser (1)
- Cloud Build (1)
- Cloud Consulting (1)
- Cloud Cost Optimization (1)
- Cloud Data Warehouse (1)
- Cloud Database Management (1)
- Cloud Dataproc (1)
- Cloud Foundry (1)
- Cloud Manager (1)
- Cloud Networking (1)
- Cloud SQL Replica (1)
- Cloud Scheduler (1)
- Cloud Services (1)
- Cloud Strategies (1)
- Compliance (1)
- Conversational AI (1)
- Cyber Security (1)
- Data Analysis (1)
- Data Analytics Platform (1)
- Data Box (1)
- Data Classification (1)
- Data Cleansing (1)
- Data Encryption (1)
- Data Engineering (1)
- Data Estate (1)
- Data Insights (1)
- Data Integrity (1)
- Data Leader (1)
- Data Lifecycle Management (1)
- Data Lineage (1)
- Data Masking (1)
- Data Mesh (1)
- Data Migration Assistant (1)
- Data Migration Service (1)
- Data Mining (1)
- Data Modeling (1)
- Data Monetization (1)
- Data Policy (1)
- Data Profiling (1)
- Data Protection (1)
- Data Retention (1)
- Data Safe (1)
- Data Sheets (1)
- Data Summit (1)
- Data Vault (1)
- Data Warehouse Modernization (1)
- Database Auditing (1)
- Database Consultant (1)
- Database Consulting (1)
- Database Link (1)
- Database Migrations (1)
- Database Modernization (1)
- Database Provisioning (1)
- Database Provisioning Failed (1)
- Database Replication (1)
- Database Schemas (1)
- Databricks (1)
- Datascape 59 (1)
- DeepSeek (1)
- Docker-Composer (1)
- Duet AI (1)
- Edp (1)
- Etl (1)
- Gcp Compute (1)
- Gcp-Spanner (1)
- Global Analytics (1)
- Google Analytics (1)
- Google Cloud Architecture Framework (1)
- Google Cloud Data Services (1)
- Google Cloud Partner (1)
- Google Cloud Spanner (1)
- Google Cloud VMware Engine (1)
- Google Compute Engine (1)
- Google Dataflow (1)
- Google Datalab (1)
- Google Grab And Go (1)
- Graph Algorithms (1)
- Graph Databases (1)
- Graph Inferences (1)
- Graph Theory (1)
- GraphQL (1)
- Health Check (1)
- Healthcheck (1)
- Information (1)
- Infrastructure As A Code (1)
- Innobackupex (1)
- Innodb Concurrency (1)
- Innodb Flush Method (1)
- It Industry (1)
- Kubeflow (1)
- LMSYS Chatbot Arena (1)
- Linux Host Monitoring (1)
- Linux Storage Appliance (1)
- Looker (1)
- MMLU (1)
- Managed Services (1)
- Migrate (1)
- Migration Checklist (1)
- MongoDB Atlas (1)
- MongoDB Compass (1)
- Newsroom (1)
- Nifi (1)
- OPEX (1)
- Odbcs (1)
- Odbs (1)
- On-Premises (1)
- Ora-01852 (1)
- Ora-7445 (1)
- Oracle Cursor (1)
- Oracle Database@Google Cloud (1)
- Oracle Exadata Smart Scan (1)
- Oracle Licensing (1)
- Oracle Linux Virtualization Manager (1)
- Oracle Oda (1)
- Oracle Openworld (1)
- Oracle Parallelism (1)
- Oracle RMAN (1)
- Oracle Rdbms (1)
- Oracle Real Application Clusters (1)
- Oracle Reports (1)
- Oracle Security (1)
- Perfomrance (1)
- Performance Schema (1)
- Policy (1)
- Prompt Engineering (1)
- Public Cloud (1)
- Pythian News (1)
- Rdb (1)
- Replication Error (1)
- Retail (1)
- Securing Sql Server (1)
- Serverless Computing (1)
- Sso (1)
- Tenserflow (1)
- Teradata (1)
- Vertex AI (1)
- Videos (1)
- Workspace Security (1)
- Xbstream (1)
- August 2025 (1)
- July 2025 (3)
- June 2025 (1)
- May 2025 (3)
- March 2025 (2)
- February 2025 (1)
- January 2025 (2)
- December 2024 (1)
- October 2024 (2)
- September 2024 (7)
- August 2024 (4)
- July 2024 (2)
- June 2024 (6)
- May 2024 (3)
- April 2024 (2)
- February 2024 (1)
- January 2024 (11)
- December 2023 (10)
- November 2023 (10)
- October 2023 (11)
- September 2023 (8)
- August 2023 (6)
- July 2023 (2)
- June 2023 (13)
- May 2023 (4)
- April 2023 (6)
- March 2023 (10)
- February 2023 (6)
- January 2023 (5)
- December 2022 (10)
- November 2022 (10)
- October 2022 (10)
- September 2022 (13)
- August 2022 (16)
- July 2022 (12)
- June 2022 (13)
- May 2022 (11)
- April 2022 (4)
- March 2022 (5)
- February 2022 (4)
- January 2022 (14)
- December 2021 (16)
- November 2021 (11)
- October 2021 (6)
- September 2021 (11)
- August 2021 (6)
- July 2021 (9)
- June 2021 (4)
- May 2021 (8)
- April 2021 (16)
- March 2021 (16)
- February 2021 (6)
- January 2021 (12)
- December 2020 (12)
- November 2020 (17)
- October 2020 (11)
- September 2020 (10)
- August 2020 (11)
- July 2020 (13)
- June 2020 (6)
- May 2020 (9)
- April 2020 (18)
- March 2020 (21)
- February 2020 (13)
- January 2020 (15)
- December 2019 (10)
- November 2019 (11)
- October 2019 (12)
- September 2019 (16)
- August 2019 (15)
- July 2019 (10)
- June 2019 (16)
- May 2019 (20)
- April 2019 (21)
- March 2019 (14)
- February 2019 (18)
- January 2019 (18)
- December 2018 (5)
- November 2018 (16)
- October 2018 (12)
- September 2018 (20)
- August 2018 (27)
- July 2018 (31)
- June 2018 (34)
- May 2018 (28)
- April 2018 (27)
- March 2018 (17)
- February 2018 (8)
- January 2018 (20)
- December 2017 (14)
- November 2017 (4)
- October 2017 (1)
- September 2017 (3)
- August 2017 (5)
- July 2017 (4)
- June 2017 (2)
- May 2017 (7)
- April 2017 (7)
- March 2017 (8)
- February 2017 (8)
- January 2017 (5)
- December 2016 (3)
- November 2016 (4)
- October 2016 (8)
- September 2016 (9)
- August 2016 (10)
- July 2016 (9)
- June 2016 (8)
- May 2016 (13)
- April 2016 (16)
- March 2016 (13)
- February 2016 (11)
- January 2016 (6)
- December 2015 (11)
- November 2015 (11)
- October 2015 (5)
- September 2015 (16)
- August 2015 (4)
- July 2015 (1)
- June 2015 (3)
- May 2015 (6)
- April 2015 (5)
- March 2015 (5)
- February 2015 (4)
- January 2015 (3)
- December 2014 (7)
- October 2014 (4)
- September 2014 (6)
- August 2014 (6)
- July 2014 (16)
- June 2014 (7)
- May 2014 (6)
- April 2014 (5)
- March 2014 (4)
- February 2014 (10)
- January 2014 (6)
- December 2013 (8)
- November 2013 (12)
- October 2013 (9)
- September 2013 (6)
- August 2013 (7)
- July 2013 (9)
- June 2013 (7)
- May 2013 (7)
- April 2013 (4)
- March 2013 (7)
- February 2013 (4)
- January 2013 (4)
- December 2012 (6)
- November 2012 (8)
- October 2012 (9)
- September 2012 (3)
- August 2012 (5)
- July 2012 (5)
- June 2012 (7)
- May 2012 (11)
- April 2012 (1)
- March 2012 (8)
- February 2012 (1)
- January 2012 (6)
- December 2011 (8)
- November 2011 (5)
- October 2011 (9)
- September 2011 (6)
- August 2011 (4)
- July 2011 (1)
- June 2011 (1)
- May 2011 (5)
- April 2011 (2)
- February 2011 (2)
- January 2011 (2)
- December 2010 (1)
- November 2010 (7)
- October 2010 (3)
- September 2010 (8)
- August 2010 (2)
- July 2010 (4)
- June 2010 (7)
- May 2010 (2)
- April 2010 (1)
- March 2010 (3)
- February 2010 (3)
- January 2010 (2)
- November 2009 (6)
- October 2009 (6)
- August 2009 (3)
- July 2009 (3)
- June 2009 (3)
- May 2009 (2)
- April 2009 (8)
- March 2009 (6)
- February 2009 (4)
- January 2009 (3)
- November 2008 (3)
- October 2008 (7)
- September 2008 (6)
- August 2008 (9)
- July 2008 (9)
- June 2008 (9)
- May 2008 (9)
- April 2008 (8)
- March 2008 (4)
- February 2008 (3)
- January 2008 (3)
- December 2007 (2)
- November 2007 (7)
- October 2007 (1)
- August 2007 (4)
- July 2007 (3)
- June 2007 (8)
- May 2007 (4)
- April 2007 (2)
- March 2007 (2)
- February 2007 (5)
- January 2007 (8)
- December 2006 (1)
- November 2006 (3)
- October 2006 (4)
- September 2006 (3)
- July 2006 (1)
- May 2006 (2)
- April 2006 (1)
- July 2005 (1)


 
 
 
 
No Comments Yet
Let us know what you think