In this brief write-up, I demonstrate how to build a working pipeline to ingest Kafka records into CockroachDB via Kafka Connect's JDBC Sink Connector, locally, using Docker.
This serves as functional testing, that is, for understanding the concepts and all moving parts before moving on to performance testing, on real Production grade clusters.
Stay tuned for a follow-up blog where we will use this content to build and run a performance test.
UPDATE: performance test is here!
Setup
Save below as file kafka2crdb.yaml
.
# kafka2crdb.yaml --- version: '2' services: zookeeper: image: confluentinc/cp-zookeeper:7.3.0 hostname: zookeeper container_name: zookeeper ports: - 2181:2181 environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 broker: image: confluentinc/cp-server:7.3.0 hostname: broker container_name: broker depends_on: - zookeeper ports: - 9092:9092 - 9101:9101 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1 KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_JMX_PORT: 9101 KAFKA_JMX_HOSTNAME: localhost KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081 CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092 CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1 CONFLUENT_METRICS_ENABLE: 'true' CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous' schema-registry: image: confluentinc/cp-schema-registry:7.3.0 hostname: schema-registry container_name: schema-registry depends_on: - broker ports: - 8081:8081 environment: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092' SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 kafka-connect: image: cnfldemos/cp-server-connect-datagen:0.6.0-7.3.0 hostname: connect container_name: connect depends_on: - broker - schema-registry ports: - 8083:8083 environment: CONNECT_BOOTSTRAP_SERVERS: 'broker:29092' CONNECT_REST_ADVERTISED_HOST_NAME: connect CONNECT_GROUP_ID: compose-connect-group CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000 CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081 CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.3.0.jar CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor" CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor" CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components" CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR command: - bash - -c - | # Installing connector plugin confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:latest # Downloading newer Postgres JDBC driver cd /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib rm -rf postgresql* wget https://jdbc.postgresql.org/download/postgresql-42.5.0.jar # Launching Kafka Connect worker /etc/confluent/docker/run & sleep infinity control-center: image: confluentinc/cp-enterprise-control-center:7.3.0 hostname: control-center container_name: control-center depends_on: - broker - schema-registry - kafka-connect ports: - 9021:9021 environment: CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092' CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083' CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088" CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088" CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" CONTROL_CENTER_REPLICATION_FACTOR: 1 CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1 CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1 CONFLUENT_METRICS_TOPIC_REPLICATION: 1 PORT: 9021 cockroach: image: cockroachdb/cockroach:latest container_name: cockroach-1 command: start-single-node --insecure ports: - 26257:26257 - 8080:8080
Notice in the kafka-connect
definition how we're installing the latest version of the JDBC Sink Connector and update the JDBC Postgresql driver.
Bring the Docker Compose up.
Note:
You might have to tweak your Docker env to allow for more resources.
I use Docker Desktop for macOS configured with 12 CPUs and 22GB Memory.
docker-compose -f kafka2crdb.yaml up -d
Make sure everything is up and running
$ docker-compose -f kafka2crdb.yaml ps NAME COMMAND SERVICE STATUS PORTS broker "/etc/confluent/dock…" broker running 0.0.0.0:9092->9092/tcp, 0.0.0.0:9101->9101/tcp cockroach-1 "/cockroach/cockroac…" cockroach running 0.0.0.0:8080->8080/tcp, 0.0.0.0:26257->26257/tcp connect "bash -c '# Installi…" kafka-connect running 0.0.0.0:8083->8083/tcp, 9092/tcp control-center "/etc/confluent/dock…" control-center running 0.0.0.0:9021->9021/tcp schema-registry "/etc/confluent/dock…" schema-registry running 0.0.0.0:8081->8081/tcp zookeeper "/etc/confluent/dock…" zookeeper running 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp
Open the Control Center at http://localhost:9021 and wait until the broker shows up as healthy.
Open another tab for the CockroachDB Console at http://localhost:8080.
Demo
We create the below pipeline:
datagen(source) --> Kafka Topic --> Kafka JDBC Sink Connector --> CockroachDB(target)
You can do most of below tasks from within the Control Center.
Create Kafka topic
Connect to the broker
container, and create a topic transactions
with 4 partitions
docker exec -it broker /bin/bash
Once in the broker
shell
# $ create topic with 4 partitions kafka-topics --bootstrap-server broker:9092 --create --topic transactions --partitions 4 # describe it $ kafka-topics --bootstrap-server broker:9092 --topic transactions --describe Topic: transactions TopicId: F6NF00Z8Ry2xIWnbYVF9vg PartitionCount: 4 ReplicationFactor: 1 Configs: Topic: transactions Partition: 0 Leader: 1 Replicas: 1 Isr: 1 Offline: Topic: transactions Partition: 1 Leader: 1 Replicas: 1 Isr: 1 Offline: Topic: transactions Partition: 2 Leader: 1 Replicas: 1 Isr: 1 Offline: Topic: transactions Partition: 3 Leader: 1 Replicas: 1 Isr: 1 Offline:
With the topic partitioned into 4, we can have an equal amount of consumer processes, or tasks, consuming from the topic and ingesting into CockroachDB, in parallel.
All set, now we are ready to ingest data into this topic.
Configure the Source Connector
Our source will be a built-in data generator.
Open a new terminal, and create the Datagen Connector.
# the datagen-connect container listens on port 8083 curl -s -X POST http://localhost:8083/connectors/ \ -H "Content-Type: application/json" -d '{ "name": "datagen-transactions", "config": { "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "kafka.topic": "transactions", "tasks.max": "64", "max.interval": "100", "quickstart": "transactions" } }' | jq '.'
Confirm in the Control Center that messages are getting generated by visiting the Topics
section.
Check also on the Messages and Schema tabs to see what the records look like
Configure the Sink Connector
We're ready to setup the JDBC Sink Connector to ingest data into CockroachDB.
Note how we set batch.size
the same as max.poll.records
to make sure 1 transaction includes only 128 records.
Because we've set reWriteBatchedInserts=true
, the JDBC Postgres driver will conflate the 128 individual INSERT statements into a single, multi-record INSERT statement.
Note however that this single statement transaction is still an explicit transaction.
Note:
I've re-built the connector set with autocommit=true to leverage implicit transactions.
I have therefore replaced the original filekafka-connect-jdbc-10.6.1.jar
in/usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/
with my custom build JAR file.
Below screenshots show the results from using this custom build.
We also set tasks.max=4
to have 4 consumer processes reading from the 4 topic partitions and ingesting data into CockroachDB in parallel. Each task creates its own database connection and reads from a specific topic partition.
# register the connector curl -s -X POST http://localhost:8083/connectors/ \ -H "Content-Type: application/json" -d '{ "name": "sink-crdb", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url": "jdbc:postgresql://cockroach-1:26257/defaultdb?reWriteBatchedInserts=true&ApplicationName=txns", "topics": "transactions", "tasks.max": "4", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://schema-registry:8081", "connection.user": "root", "connection.password": "", "auto.create": true, "auto.evolve": true, "insert.mode": "insert", "pk.mode": "none", "pk.fields": "none", "batch.size": 128, "consumer.override.max.poll.records": 128 } }' | jq '.'
You should see now 4 active SQL connections established to CockroachDB, and activity on the SQL Statement chart
Here, in CockroachDB Console in the SQL Activity > Transaction page, we confirm each transaction writes 128 rows.
Drilling down into the transaction, we can see of how many statements it is composed. In this case, as expected, it is just 1 INSERT statement
Further drilling down into the Statement itself, we can see this is a multi-record INSERT statements with 128 records, executed as an implicit transaction.
Query data in CockroachDB
Open a SQL prompt
docker exec -it cockroach-1 cockroach sql --insecure
-- the table was automatically created by the Kafka JDBC Sink Connector -- this behavior is of course configurable > SHOW TABLES; schema_name | table_name | type | owner | estimated_row_count | locality --------------+--------------+-------+-------+---------------------+----------- public | transactions | table | root | 0 | NULL -- check the schema of the created table -- note how CockroachDB created its own Primary Key -- this is also configurable > SHOW CREATE transactions; table_name | create_statement ---------------+-------------------------------------------------------------- transactions | CREATE TABLE public.transactions ( | transaction_id INT8 NOT NULL, | card_id INT8 NOT NULL, | user_id STRING NOT NULL, | purchase_id INT8 NOT NULL, | store_id INT8 NOT NULL, | rowid INT8 NOT VISIBLE NOT NULL DEFAULT unique_rowid(), | CONSTRAINT transactions_pkey PRIMARY KEY (rowid ASC) | ) -- inspect few rows > SELECT * FROM transactions LIMIT 5; transaction_id | card_id | user_id | purchase_id | store_id -----------------+---------+---------+-------------+----------- 1 | 7 | User_6 | 0 | 1 1 | 19 | User_7 | 0 | 6 1 | 18 | User_5 | 0 | 6 1 | 6 | User_ | 0 | 3 1 | 13 | User_ | 0 | 3 -- avoid contention by using AS OF SYSTEM TIME when running large aggregate queries > SELECT count(*) FROM transactions AS OF SYSTEM TIME '-5s'; count ---------- 594633 -- wait few seconds, then query again to see count increasing... > SELECT count(*) FROM transactions AS OF SYSTEM TIME '-5s'; count ---------- 594992
You can view more metrics and statement statistics on the DB Console, at http://localhost:8080.
Clean up
Bring down and remove all containers with
docker-compose -f kafka2crdb.yaml down
Top comments (0)