DEV Community

Fabio Ghirardello for Cockroach Labs

Posted on • Edited on

Ingesting data from Kafka to CockroachDB via Kafka Connect

In this brief write-up, I demonstrate how to build a working pipeline to ingest Kafka records into CockroachDB via Kafka Connect's JDBC Sink Connector, locally, using Docker.

This serves as functional testing, that is, for understanding the concepts and all moving parts before moving on to performance testing, on real Production grade clusters.
Stay tuned for a follow-up blog where we will use this content to build and run a performance test.
UPDATE: performance test is here!

Setup

Save below as file kafka2crdb.yaml.

# kafka2crdb.yaml --- version: '2' services: zookeeper: image: confluentinc/cp-zookeeper:7.3.0 hostname: zookeeper container_name: zookeeper ports: - 2181:2181 environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 broker: image: confluentinc/cp-server:7.3.0 hostname: broker container_name: broker depends_on: - zookeeper ports: - 9092:9092 - 9101:9101 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1 KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_JMX_PORT: 9101 KAFKA_JMX_HOSTNAME: localhost KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081 CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092 CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1 CONFLUENT_METRICS_ENABLE: 'true' CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous' schema-registry: image: confluentinc/cp-schema-registry:7.3.0 hostname: schema-registry container_name: schema-registry depends_on: - broker ports: - 8081:8081 environment: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092' SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 kafka-connect: image: cnfldemos/cp-server-connect-datagen:0.6.0-7.3.0 hostname: connect container_name: connect depends_on: - broker - schema-registry ports: - 8083:8083 environment: CONNECT_BOOTSTRAP_SERVERS: 'broker:29092' CONNECT_REST_ADVERTISED_HOST_NAME: connect CONNECT_GROUP_ID: compose-connect-group CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000 CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081 CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.3.0.jar CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor" CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor" CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components" CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR command: - bash - -c - | # Installing connector plugin confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:latest # Downloading newer Postgres JDBC driver cd /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib rm -rf postgresql* wget https://jdbc.postgresql.org/download/postgresql-42.5.0.jar  # Launching Kafka Connect worker /etc/confluent/docker/run & sleep infinity control-center: image: confluentinc/cp-enterprise-control-center:7.3.0 hostname: control-center container_name: control-center depends_on: - broker - schema-registry - kafka-connect ports: - 9021:9021 environment: CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092' CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083' CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088" CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088" CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" CONTROL_CENTER_REPLICATION_FACTOR: 1 CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1 CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1 CONFLUENT_METRICS_TOPIC_REPLICATION: 1 PORT: 9021 cockroach: image: cockroachdb/cockroach:latest container_name: cockroach-1 command: start-single-node --insecure ports: - 26257:26257 - 8080:8080 
Enter fullscreen mode Exit fullscreen mode

Notice in the kafka-connect definition how we're installing the latest version of the JDBC Sink Connector and update the JDBC Postgresql driver.

Bring the Docker Compose up.

Note:
You might have to tweak your Docker env to allow for more resources.
I use Docker Desktop for macOS configured with 12 CPUs and 22GB Memory.

docker-compose -f kafka2crdb.yaml up -d 
Enter fullscreen mode Exit fullscreen mode

Make sure everything is up and running

$ docker-compose -f kafka2crdb.yaml ps NAME COMMAND SERVICE STATUS PORTS broker "/etc/confluent/dock…" broker running 0.0.0.0:9092->9092/tcp, 0.0.0.0:9101->9101/tcp cockroach-1 "/cockroach/cockroac…" cockroach running 0.0.0.0:8080->8080/tcp, 0.0.0.0:26257->26257/tcp connect "bash -c '# Installi…" kafka-connect running 0.0.0.0:8083->8083/tcp, 9092/tcp control-center "/etc/confluent/dock…" control-center running 0.0.0.0:9021->9021/tcp schema-registry "/etc/confluent/dock…" schema-registry running 0.0.0.0:8081->8081/tcp zookeeper "/etc/confluent/dock…" zookeeper running 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp 
Enter fullscreen mode Exit fullscreen mode

Open the Control Center at http://localhost:9021 and wait until the broker shows up as healthy.

Open another tab for the CockroachDB Console at http://localhost:8080.

Demo

We create the below pipeline:

datagen(source) --> Kafka Topic --> Kafka JDBC Sink Connector --> CockroachDB(target)

You can do most of below tasks from within the Control Center.

Create Kafka topic

Connect to the broker container, and create a topic transactions with 4 partitions

docker exec -it broker /bin/bash 
Enter fullscreen mode Exit fullscreen mode

Once in the broker shell

# $ create topic with 4 partitions kafka-topics --bootstrap-server broker:9092 --create --topic transactions --partitions 4 # describe it $ kafka-topics --bootstrap-server broker:9092 --topic transactions --describe Topic: transactions TopicId: F6NF00Z8Ry2xIWnbYVF9vg PartitionCount: 4 ReplicationFactor: 1 Configs: Topic: transactions Partition: 0 Leader: 1 Replicas: 1 Isr: 1 Offline: Topic: transactions Partition: 1 Leader: 1 Replicas: 1 Isr: 1 Offline: Topic: transactions Partition: 2 Leader: 1 Replicas: 1 Isr: 1 Offline: Topic: transactions Partition: 3 Leader: 1 Replicas: 1 Isr: 1 Offline: 
Enter fullscreen mode Exit fullscreen mode

With the topic partitioned into 4, we can have an equal amount of consumer processes, or tasks, consuming from the topic and ingesting into CockroachDB, in parallel.

All set, now we are ready to ingest data into this topic.

Configure the Source Connector

Our source will be a built-in data generator.

Open a new terminal, and create the Datagen Connector.

# the datagen-connect container listens on port 8083 curl -s -X POST http://localhost:8083/connectors/ \ -H "Content-Type: application/json" -d '{ "name": "datagen-transactions", "config": { "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "kafka.topic": "transactions", "tasks.max": "64", "max.interval": "100", "quickstart": "transactions" } }' | jq '.' 
Enter fullscreen mode Exit fullscreen mode

Confirm in the Control Center that messages are getting generated by visiting the Topics section.

topics

Check also on the Messages and Schema tabs to see what the records look like

messages

Configure the Sink Connector

We're ready to setup the JDBC Sink Connector to ingest data into CockroachDB.

Note how we set batch.size the same as max.poll.records to make sure 1 transaction includes only 128 records.

Because we've set reWriteBatchedInserts=true, the JDBC Postgres driver will conflate the 128 individual INSERT statements into a single, multi-record INSERT statement.

Note however that this single statement transaction is still an explicit transaction.

Note:
I've re-built the connector set with autocommit=true to leverage implicit transactions.
I have therefore replaced the original file kafka-connect-jdbc-10.6.1.jar in /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/ with my custom build JAR file.
Below screenshots show the results from using this custom build.

We also set tasks.max=4 to have 4 consumer processes reading from the 4 topic partitions and ingesting data into CockroachDB in parallel. Each task creates its own database connection and reads from a specific topic partition.

# register the connector  curl -s -X POST http://localhost:8083/connectors/ \ -H "Content-Type: application/json" -d '{ "name": "sink-crdb", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url": "jdbc:postgresql://cockroach-1:26257/defaultdb?reWriteBatchedInserts=true&ApplicationName=txns", "topics": "transactions", "tasks.max": "4", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://schema-registry:8081", "connection.user": "root", "connection.password": "", "auto.create": true, "auto.evolve": true, "insert.mode": "insert", "pk.mode": "none", "pk.fields": "none", "batch.size": 128, "consumer.override.max.poll.records": 128 } }' | jq '.' 
Enter fullscreen mode Exit fullscreen mode

You should see now 4 active SQL connections established to CockroachDB, and activity on the SQL Statement chart

statements-chart

Here, in CockroachDB Console in the SQL Activity > Transaction page, we confirm each transaction writes 128 rows.

txn

Drilling down into the transaction, we can see of how many statements it is composed. In this case, as expected, it is just 1 INSERT statement

txn-description

Further drilling down into the Statement itself, we can see this is a multi-record INSERT statements with 128 records, executed as an implicit transaction.

stmt-description

Query data in CockroachDB

Open a SQL prompt

docker exec -it cockroach-1 cockroach sql --insecure 
Enter fullscreen mode Exit fullscreen mode
-- the table was automatically created by the Kafka JDBC Sink Connector  -- this behavior is of course configurable > SHOW TABLES; schema_name | table_name | type | owner | estimated_row_count | locality --------------+--------------+-------+-------+---------------------+----------- public | transactions | table | root | 0 | NULL -- check the schema of the created table  -- note how CockroachDB created its own Primary Key -- this is also configurable > SHOW CREATE transactions; table_name | create_statement ---------------+-------------------------------------------------------------- transactions | CREATE TABLE public.transactions ( | transaction_id INT8 NOT NULL, | card_id INT8 NOT NULL, | user_id STRING NOT NULL, | purchase_id INT8 NOT NULL, | store_id INT8 NOT NULL, | rowid INT8 NOT VISIBLE NOT NULL DEFAULT unique_rowid(), | CONSTRAINT transactions_pkey PRIMARY KEY (rowid ASC) | ) -- inspect few rows > SELECT * FROM transactions LIMIT 5; transaction_id | card_id | user_id | purchase_id | store_id -----------------+---------+---------+-------------+----------- 1 | 7 | User_6 | 0 | 1 1 | 19 | User_7 | 0 | 6 1 | 18 | User_5 | 0 | 6 1 | 6 | User_ | 0 | 3 1 | 13 | User_ | 0 | 3 -- avoid contention by using AS OF SYSTEM TIME when running large aggregate queries > SELECT count(*) FROM transactions AS OF SYSTEM TIME '-5s'; count ---------- 594633 -- wait few seconds, then query again to see count increasing... > SELECT count(*) FROM transactions AS OF SYSTEM TIME '-5s'; count ---------- 594992 
Enter fullscreen mode Exit fullscreen mode

You can view more metrics and statement statistics on the DB Console, at http://localhost:8080.

Clean up

Bring down and remove all containers with

docker-compose -f kafka2crdb.yaml down 
Enter fullscreen mode Exit fullscreen mode

Reference

Top comments (0)