DEV Community

Scott Wang
Scott Wang

Posted on

MySQL 8 Kafka Connect Tutorial on Docker

In this tutorial, we will use docker-compose, MySQL 8 as examples to demonstrate Kafka Connector by using MySQL as the data source.

This tutorial is mainly based on the tutorial written on Kafka Connect Tutorial on Docker. However, the original tutorial is out-dated that it just won’t work if you followed it step by step. This is a refreshment of that tutorial, also to simplify things, we will get rid of the Avro set up as it serves no purpose to demonstrate Kafka Connector. Enough said, let’s begins.

Preparation

First, you will need to download MYSQL Connector Driver, this can be found
MySQL Connector Driver

You will also need to download the JDBC plugins at
Confluent JDBC plugins

Unzip both mysql-connector-java-8.0.22.tar.gz and confluentinc-kafka-connect-jdbc-10.0–2.1.zip. Create a jars directory, move mysql-connector-java-8.0.22.jar and all the .jar files in onfluentinc-kafka-connect-jdbc-10.0–2.1/lib/ directory to the jars directory.

docker-compose file

Here is the docker-compose file that contains everything you need to run this tutorial

version: '2' services: mysql: privileged: true ports: - 3306:3306 environment: MYSQL_ROOT_PASSWORD: test image: mysql:8.0 zookeeper: image: confluentinc/cp-zookeeper:5.0.0 privileged: true ports: - 32181:32181 environment: ZOOKEEPER_CLIENT_PORT: 32181 ZOOKEEPER_TICK_TIME: 2000 kafka: image: confluentinc/cp-kafka:5.0.0 ports: - 29092:29092 links: - zookeeper environment: KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 kafka-connector-mysql: image: confluentinc/cp-kafka-connect:latest ports: - 28083:28083 links: - kafka - zookeeper - mysql environment: CONNECT_BOOTSTRAP_SERVERS: kafka:29092 CONNECT_REST_PORT: 28083 CONNECT_GROUP_ID: "quickstart-avro" CONNECT_CONFIG_STORAGE_TOPIC: "quickstart-avro-config" CONNECT_OFFSET_STORAGE_TOPIC: "quickstart-avro-offsets" CONNECT_STATUS_STORAGE_TOPIC: "quickstart-avro-status" CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" CONNECT_REST_ADVERTISED_HOST_NAME: "localhost" CONNECT_LOG4J_ROOT_LOGLEVEL: DEBUG CONNECT_PLUGIN_PATH: "/usr/share/java,/etc/kafka-connect/jars" volumes: - $PWD/jars:/etc/kafka-connect/jars 
Enter fullscreen mode Exit fullscreen mode

Before you run the docker-compose, make sure your file structure looks like
Alt Text

Let the fun begins

Let the fun begins with starting mysql service

docker-compose up -d mysql docker exec -it kafka-connector_mysql_1 bash 
Enter fullscreen mode Exit fullscreen mode

Execute the following queries by using MySQL cli mysql -uroot -ptest

CREATE DATABASE IF NOT EXISTS connect_test; USE connect_test; CREATE TABLE IF NOT EXISTS test ( id serial NOT NULL PRIMARY KEY, name varchar(100), email varchar(200), department varchar(200), modified timestamp default CURRENT_TIMESTAMP NOT NULL, INDEX `modified_index` (`modified`) ); INSERT INTO test (name, email, department) VALUES ('alice', 'alice@abc.com', 'engineering'); INSERT INTO test (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales'); INSERT INTO test (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales'); INSERT INTO test (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales'); INSERT INTO test (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales'); INSERT INTO test (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales'); INSERT INTO test (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales'); INSERT INTO test (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales'); INSERT INTO test (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales'); INSERT INTO test (name, email, department) VALUES ('bob', 'bob@abc.com', 'sales'); exit; 
Enter fullscreen mode Exit fullscreen mode

Let’s start zookeeper and kafka

docker-compose up -d zookeeper kafka 
Enter fullscreen mode Exit fullscreen mode

wait for few seconds, to make sure that all services are up and running.

docker ps 
Enter fullscreen mode Exit fullscreen mode

Once the zookeeper, Kafka and mysql are all up and running, let’s prepare our final course, confluent kafka-connect
First, let’s create the topics that will be used by the connector

docker-compose run --rm kafka kafka-topics --create --topic quickstart-avro-offsets --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181 --config cleanup.policy=compact docker-compose run --rm kafka kafka-topics --create --topic quickstart-avro-config --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181 --config cleanup.policy=compact docker-compose run --rm kafka kafka-topics --create --topic quickstart-avro-status --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:32181 --config cleanup.policy=compact 
Enter fullscreen mode Exit fullscreen mode

Finally, let’s start the kafka-connect

docker-compose up -d kafka-connector-mysql 
Enter fullscreen mode Exit fullscreen mode

Give it few seconds, and let’s create the JDBC Source connector by making a REST API call to the kafka connector service.

curl -X POST \ -H "Content-Type: application/json" \ --data '{ "name": "quickstart-jdbc-source", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": 1, "connection.url": "jdbc:mysql://mysql:3306/connect_test", "connection.user": "root", "connection.password": "test", "mode": "incrementing", "incrementing.column.name": "id", "timestamp.column.name": "modified", "topic.prefix": "quickstart-jdbc-", "poll.interval.ms": 1000 } }' \ http://localhost:28083/connectors 
Enter fullscreen mode Exit fullscreen mode

wait for few seconds, and check the status to make sure it is RUNNING

curl -s -X GET http://localhost:28083/connectors/quickstart-jdbc-source/status 
Enter fullscreen mode Exit fullscreen mode

You should see something similar to

{"name":"quickstart-jdbc-source","connector":{"state":"RUNNING","worker_id":"localhost:28083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"localhost:28083"}],"type":"source"}% 
Enter fullscreen mode Exit fullscreen mode

That’s yet, let’s verify our work by running a Kafka consumer,

docker-compose run --rm kafka kafka-console-consumer --bootstrap-server kafka:29092 --topic quickstart-jdbc-test --from-beginning 
Enter fullscreen mode Exit fullscreen mode

and you should see the following logs show up

{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"email"},{"type":"string","optional":true,"field":"department"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"modified"}],"optional":false,"name":"test"},"payload":{"id":18,"name":"bob","email":"bob@abc.com","department":"sales","modified":1606418867000}} {"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"email"},{"type":"string","optional":true,"field":"department"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"modified"}],"optional":false,"name":"test"},"payload":{"id":19,"name":"bob","email":"bob@abc.com","department":"sales","modified":1606418867000}} {"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"email"},{"type":"string","optional":true,"field":"department"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"modified"}],"optional":false,"name":"test"},"payload":{"id":20,"name":"bob","email":"bob@abc.com","department":"sales","modified":1606418867000}} ^CProcessed a total of 10 messages 
Enter fullscreen mode Exit fullscreen mode

That’s yet. To tear it all down, simply run

docker-compose down 
Enter fullscreen mode Exit fullscreen mode

Enjoy.

Top comments (2)

Collapse
 
zakrian07 profile image
zakrian07

docker ps always show starting and curl request not success ?
any idea how to fix it?

Collapse
 
nhatdiec profile image
nhatdiec • Edited

Hi Scott Wang,
I'm newbie on kafka, I followed this article and have error as image below. Am I missing any files?
dev-to-uploads.s3.amazonaws.com/up...