DEV Community

Cover image for Connecting to Kafka Cluster running on Kubernetes from your Local Machine : CLI & Programatic Access
karan singh
karan singh

Posted on

Connecting to Kafka Cluster running on Kubernetes from your Local Machine : CLI & Programatic Access

Introduction

Why do you need this?

  • For local development you want to connect to a remote Kafka Cluster running on OpenShift , that is deployed using Strimzi Operator

Prerequisite

  • OpenShift Container Platform or OKD

  • Strimzi Operator deployed

Deploy Kafka Cluster

  • Create a YAML file with these contents (only for dev/test clusters)
 apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-cluster namespace: nestjs-testing spec: entityOperator: topicOperator: {} userOperator: {} kafka: config: inter.broker.protocol.version: "2.8" log.message.format.version: "2.8" offsets.topic.replication.factor: 3 transaction.state.log.min.isr: 2 transaction.state.log.replication.factor: 3 listeners: - name: plain port: 9092 tls: false type: internal - name: tls port: 9093 tls: true type: internal - name: route port: 9094 tls: true type: route replicas: 3 storage: type: ephemeral version: 2.8.0 zookeeper: replicas: 3 storage: type: ephemeral 
Enter fullscreen mode Exit fullscreen mode

Preparing to Connect

 oc get secret my-cluster-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' | base64 -d > ca.crt keytool -import -trustcacerts -alias root -file ca.crt -keystore truststore.jks -storepass password -noprompt # This should create 2 files in PWD ls -l *.crt *.jks 
Enter fullscreen mode Exit fullscreen mode

Grab Kafka Endpoint

KAFKA_ENDPOINT=$(oc get kafka my-cluster -o=jsonpath='{.status.listeners[?(@.type=="route")].bootstrapServers}{"\n"}') 
Enter fullscreen mode Exit fullscreen mode

Connecting from CLI (Kafka Console Producer/Consumer)

  • Get Kafka Console Producer & Consumer script files
 wget [https://dlcdn.apache.org/kafka/3.0.0/kafka_2.13-3.0.0.tgz](https://dlcdn.apache.org/kafka/3.0.0/kafka_2.13-3.0.0.tgz) ; tar -xvf kafka_2.13-3.0.0.tgz 
Enter fullscreen mode Exit fullscreen mode
  • Console Producer
 kafka_2.13-3.0.0/bin/kafka-console-producer.sh --broker-list $KAFKA_ENDPOINT --producer-property security.protocol=SSL --producer-property ssl.truststore.password=password --producer-property ssl.truststore.location=truststore.jks --topic my-topic 
Enter fullscreen mode Exit fullscreen mode
  • Console Consumer
 kafka_2.13-3.0.0/bin/kafka-console-consumer.sh --bootstrap-server $KAFKA_ENDPOINT --topic my-topic --from-beginning --consumer-property security.protocol=SSL --consumer-property ssl.truststore.password=password --consumer-property ssl.truststore.location=truststore.jks 
Enter fullscreen mode Exit fullscreen mode

Connecting from Python Client (running locally)

from kafka import KafkaProducer, KafkaConsumer import json from bson import json_util bootstrap_server = 'my-cluster-kafka-route-bootstrap-nestjs-testing.apps.ocp.ceph-s3.com:443' print("Producing messages to Kafka topic ...") producer = KafkaProducer(bootstrap_servers=bootstrap_server, ssl_cafile='ca.crt', security_protocol="SSL") for i in range(10): message = {'value': i} producer.send('my-topic', json.dumps(message, default=json_util.default).encode('utf-8')) print("Consuming messages from Kafka topic ...") consumer = KafkaConsumer('my-topic', group_id='my-group', bootstrap_servers=bootstrap_server, ssl_cafile='ca.crt', security_protocol="SSL", consumer_timeout_ms=10000, enable_auto_commit=True) for message in consumer: # message value and key are raw bytes -- decode if necessary! # e.g., for unicode: `message.value.decode('utf-8')` print ("%s:%d:%d: value=%s" % (message.topic, message.partition,message.offset,message.value)) 
Enter fullscreen mode Exit fullscreen mode

Output of Kafka Python Producer & Consumer example

This is how you can connect to a remote Kafka cluster from your local machine. This is handy when you are developing locally and eventually deploying that to your OpenShift environment.

Top comments (0)