Strimzi is almost the richest Kubernetes Kafka operator, which you can utilize to deploy Apache Kafka or its other components like Kafka Connect, Kafka Mirror, etc. This article will provide a step-by-step tutorial about deploying Kafka Connect on Kubernetes. I brought all issues I encountered during the deployment procedure and their best mitigation.
Note: Consider that this operator is based on Apache Kafka, not the Confluent Platform. That's why you may need to add some confluent artifacts like Confluent Avro Converter to get the most out of it.
This article is based on Strimzi v0.29.0
. Thus you're able to install the following versions of Kafka Connect:
- Strimzi: 0.29.0
- Apache Kafka & Kafka Connect: Up to 3.2
- Equivalent Confluent Platform: 7.2.4
Note: You can convert Confluent Platform version to Apache Kafka version and vice versa with the provided table here.
Installation
Openshift GUI and Kubernetes CLI
If you're using Openshift, navigate to Operators > installed Operators > Strimzi > Kafka Connect.
Now you will face a form containing the Kafka connect configurations. You can get the equivalent Yaml file of the form by clicking on Yaml View. Any update on the form view will be applied to the Yaml view on the fly. Although the form view is quite straightforward, It's strongly recommended not to use it for creating the instance directly. Use it only for converting your desired configuration to a Yaml file and then deploy the operator with the kubectl apply
command. So to summarize:
- Enter the configuration in the form view
- Click on Yaml view
- Copy its contents to a Yaml file on your local (e.g.
kafka-connect.yaml
) - Run:
kubectl apply -f kafka-connect.yaml
Now the Kafka-Connect kind should be deployed or updated. The deployed resources consist of Deployment and pods, Service, config maps, and secrets.
Let's get through the minimum configuration and make it more advanced, step by step.
Minimum Configuration
To deploy a simple minimum configuration of Kafka Connect, you can use the below Yaml:
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster namespace: <YOUR_PROJECT_NAME> spec: config: config.storage.replication.factor: -1 config.storage.topic: okd4-connect-cluster-configs group.id: okd4-connect-cluster offset.storage.replication.factor: -1 offset.storage.topic: okd4-connect-cluster-offsets status.storage.replication.factor: -1 status.storage.topic: okd4-connect-cluster-status bootstrapServers: kafka1, kafka2 version: 3.2.0 replicas: 1
You can have the Kafka Connect Rest API on port 8083 exposed on the pod. You can expose it on a private or internal network by defining a route on OKD.
REST API Authentication
With the configuration explained here, you can add authentication to the Kafka Connect REST proxy. Unfortunately, that doesn't work on the Strimzi operator, as discussed here. So to provide security on Kafka Connect, you've two options:
- Use the Kafka Connector operator API. Strimzi operator lets you have a Connector kind defined in a YAML file. However, it may not be practical for some use cases since updating, pausing, and stopping connectors via the REST API is necessary.
- Put the insecure REST API behind an authenticated API Gateway like Apache APISIX or any other tool or self-developed application.
JMX Prometheus Metrics
To expose JMX Prometheus Metrics, useful for observing connectors statuses in Grafana, add the below configuration:
metricsConfig: type: jmxPrometheusExporter valueFrom: configMapKeyRef: key: jmx-prometheus name: configs jmxOptions: {}
It uses a pre-defined config for Prometheus export. You can use this config:
startDelaySeconds: 0 ssl: false lowercaseOutputName: false lowercaseOutputLabelNames: false rules: - pattern : "kafka.connect<type=connect-worker-metrics>([^:]+):" name: "kafka_connect_connect_worker_metrics_$1" - pattern : "kafka.connect<type=connect-metrics, client-id=([^:]+)><>([^:]+)" name: "kafka_connect_connect_metrics_$2" labels: client: "$1" - pattern: "debezium.([^:]+)<type=connector-metrics, context=([^,]+), server=([^,]+), key=([^>]+)><>RowsScanned" name: "debezium_metrics_RowsScanned" labels: plugin: "$1" name: "$3" context: "$2" table: "$4" - pattern: "debezium.([^:]+)<type=connector-metrics, context=([^,]+), server=([^>]+)>([^:]+)" name: "debezium_metrics_$4" labels: plugin: "$1" name: "$3" context: "$2"
Service for External Prometheus
If you are intended to deploy Prometheus in companion with Strimzi to collect the metrics, follow the instructions here. However, in the case of using external Prometheus, the story goes another way:
Strimzi operator only creates port mapping in Service for these ports:
- 8083: Kafka Connect REST API
- 9999: JMX port
Sadly it doesn't create a mapping for port 9404, the Prometheus exporter HTTP port. So we've to create a service on our own:
kind: Service apiVersion: v1 metadata: name: kafka-connect-jmx-prometheus namespace: kafka-connect labels: app.kubernetes.io/instance: kafka-connect app.kubernetes.io/managed-by: strimzi-cluster-operator app.kubernetes.io/name: kafka-connect app.kubernetes.io/part-of: strimzi-kafka-connect strimzi.io/cluster: kafka-connect strimzi.io/kind: KafkaConnect spec: ports: - name: tcp-prometheus protocol: TCP port: 9404 targetPort: 9404 type: ClusterIP selector: strimzi.io/cluster: kafka-connect strimzi.io/kind: KafkaConnect strimzi.io/name: kafka-connect-connect status: loadBalancer: {}
Note: This method only works for single-pod deployments since you should define a route for the service and even in the case of headless service, the route returns one IP of a pod at a time. Hence, Prometheus can't scrape all pods metrics. That's why it is recommended to use Podmonitor and Prometheus on Cloud. This issue is discussed here
Plugins and Artifacts
To add plugins and artifacts, there are two ways:
Operator Build Section
To add plugins, you can use the operator build section. It gets the plugin or artifact addresses, downloads them in the build stage (The operator creates the build config automatically), and adds them to the plugin directory of the image.
It supports jar, tgz, zip, and maven
. However, in the case of Maven, a multi-stage Dockerfile is created, which is problematic to Openshift, and it faces failure in the build stage. Hence, you should only use other types that don't need compile stage (i.e., jar, zip, tgz) and end up with a single-stage Dockerfile.
For example, to add the Debezium MySQL plugin, you can use the below configuration:
spec: build: output: image: 'kafkaconnect:1.0' type: imagestream plugins: - artifacts: - type: tgz url: >- https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.1.4.Final/debezium-connector-mysql-2.1.4.Final-plugin.tar.gz name: debezium-connector-mysql
Note: Strimzi operator is only able to download public artifacts. So if you wish to download a privately secured artifact that is not accessible by Kubernetes, you've to give up this method and follow the next one.
Changing Image
The operator is able to use your desired image instead of its default one. Thus you can add your desired artifacts and plugins by building an image manually or via CI/CD. One of the other reasons why you may want to use this method is that Strimzi uses Apache Kafka image, not the Confluent Platform. So the deployments don't have Confluent useful packages like Confluent Avro Converter, etc. So you need to add them to your image and configure the operator to use your docker image.
For example, If you want to add your customized Debezium MySQL Connector plugin from Gitlab Generic Packages and Confluent Avro Converter to the base image, first use this Dockerfile:
ARG CONFLUENT_VERSION=7.2.4 # Install confluent avro converter FROM confluentinc/cp-kafka-connect:${CONFLUENT_VERSION} as cp # Reassign version ARG CONFLUENT_VERSION RUN confluent-hub install --no-prompt confluentinc/kafka-connect-avro-converter:${CONFLUENT_VERSION} # Copy privious artifacts to the main strimzi kafka image FROM quay.io/strimzi/kafka:0.29.0-kafka-3.2.0 ARG GITLAB_TOKEN ARG CI_API_V4_URL=https://gitlab.snapp.ir/api/v4 ARG CI_PROJECT_ID=3873 ARG DEBEZIUM_CONNECTOR_MYSQL_CUSTOMIZED_VERSION=1.0 USER root:root # Copy Confluent packages from previous stage RUN mkdir -p /opt/kafka/plugins/avro/ COPY --from=cp /usr/share/confluent-hub-components/confluentinc-kafka-connect-avro-converter/lib /opt/kafka/plugins/avro/ # Connector plugin debezium-connector-mysql RUN 'mkdir' '-p' '/opt/kafka/plugins/debezium-connector-mysql' \ && curl --header "${GITLAB_TOKEN}" -f -L \ --output /opt/kafka/plugins/debezium-connector-mysql.tgz \ ${CI_API_V4_URL}/projects/${CI_PROJECT_ID}/packages/generic/debezium-customized/${DEBEZIUM_CONNECTOR_MYSQL_CUSTOMIZED_VERSION}/debezium-connector-mysql-customized.tar.gz \ && 'tar' 'xvfz' '/opt/kafka/plugins/debezium-connector-mysql.tgz' '-C' '/opt/kafka/plugins/debezium-connector-mysql' \ && 'rm' '-vf' '/opt/kafka/plugins/debezium-connector-mysql.tgz' USER 1001
Build the image. Push it to the image stream or any other docker repository and configure the operator by adding the below line:
spec: image: image-registry.openshift-image-registry.svc:5000/kafka-connect/kafkaconnect-customized:1.0
Kafka Authentication
Depending on its type, you need to use different configurations to add Kafka authentication. However, to bring an example, here you can see the configuration for Kafka with SASL/Plaintext mechanism and scram-sha-512:
spec: authentication: passwordSecret: password: kafka-password secretName: mysecrets type: scram-sha-512 username: myuser
No need to say that you must provide the password in a secret file named mysecret.
Handling File Credentials
Since connectors need credentials to access databases, you've to define them as secrets and access them with environment variables. However, if there are too many of them, you can put all credentials in a file and address them in the connector with the $file modifier
:
1- Put all credentials as the value of a key named credentials in a secret file.
Credentials file:
USERNAME_DB_1=user1 PASSWORD_DB_1=pass1 USERNAME_DB_2=user2 PASSWORD_DB_2=pass2
Secret file:
kind: Secret apiVersion: v1 metadata: name: mysecrets namespace: kafka-connect data: credentials: <BASE64 YOUR DATA>
2- Configure the operator with the secret as volume:
spec: config: config.providers: file config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider externalConfiguration: volumes: - name: database_credentials secret: items: - key: credentials path: credentials optional: false secretName: mysecrets
3- Now in the connector, you can access PASSWORD_DB_1 with the below command:
"${file:/opt/kafka/external-configuration/database_credentials/credentials:PASSWORD_DB_1}"
Put it all together
If we put all configurations together, we'll have the below configuration for Kafka Connect:
Service, route and build configuration are ommited since we've discussed earlier in the article.
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: kafka-connect namespace: kafka-connect spec: authentication: passwordSecret: password: kafka-password secretName: mysecrets type: scram-sha-512 username: myuser config: config.providers: file config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider config.storage.replication.factor: -1 config.storage.topic: okd4-connect-cluster-configs group.id: okd4-connect-cluster offset.storage.replication.factor: -1 offset.storage.topic: okd4-connect-cluster-offsets status.storage.replication.factor: -1 status.storage.topic: okd4-connect-cluster-status bootstrapServers: 'kafka1:9092, kafka2:9092' metricsConfig: type: jmxPrometheusExporter valueFrom: configMapKeyRef: key: jmx-prometheus name: configs resources: limits: memory: 1Gi requests: memory: 1Gi readinessProbe: failureThreshold: 10 initialDelaySeconds: 60 periodSeconds: 20 jmxOptions: {} livenessProbe: failureThreshold: 10 initialDelaySeconds: 60 periodSeconds: 20 image: image-registry.openshift-image-registry.svc:5000/kafka-connect/kafkaconnect-customized:1.0 version: 3.2.0 replicas: 2 externalConfiguration: volumes: - name: database_credentials secret: items: - key: credentials path: credentials optional: false secretName: mysecrets
Conclusion
In conclusion, deploying Kafka Connect using the Strimzi Operator can be a powerful and efficient way to manage data integration in your organization. By leveraging the flexibility and scalability of Kafka, along with the ease of use and automation provided by the Strimzi Operator, you can streamline your data pipelines and improve your data-driven decision-making. In this article, I've covered the key steps involved in deploying Kafka Connect via the Strimzi Operator, including creating its minimal custom resource definition (CRD), REST API Basic authentication issue, Kafka Authentication, JMX Prometheus metrics, plugins and artifacts and handling file credentials. Following these steps, you can easily customize your Kafka Connect deployment to meet your specific needs.
Top comments (0)